From 43c883954f5edc84bd8e0e901ef770fead218ed5 Mon Sep 17 00:00:00 2001 From: "sprang@webrtc.org" Date: Thu, 29 Jan 2015 09:09:17 +0000 Subject: [PATCH] Allow rtp packet history to dynamically expand in size. When using the paced sender, packets will be put into the rtp packet history and then retreived from there again when it is time to send. In some cases (low send bitrate and very large frames created) this may overflow, causing packets to be overwritten in the packet history before they have been sent. Check this condition and expand history size if needed. This is primarily triggered during screenshare, when switching to a large picture with lots of high frequency details in it. BUG=4171 R=stefan@webrtc.org Review URL: https://webrtc-codereview.appspot.com/34879004 Cr-Commit-Position: refs/heads/master@{#8195} git-svn-id: http://webrtc.googlecode.com/svn/trunk@8195 4adac7df-926f-26a2-2b94-8c16560cd09d --- .../rtp_rtcp/source/nack_rtx_unittest.cc | 4 +- .../rtp_rtcp/source/rtp_packet_history.cc | 81 ++++++++++++++----- .../rtp_rtcp/source/rtp_packet_history.h | 40 +++++---- .../source/rtp_packet_history_unittest.cc | 45 +++++++++++ .../rtp_rtcp/source/rtp_rtcp_impl_unittest.cc | 6 ++ webrtc/modules/rtp_rtcp/source/rtp_sender.cc | 2 + 6 files changed, 140 insertions(+), 38 deletions(-) diff --git a/webrtc/modules/rtp_rtcp/source/nack_rtx_unittest.cc b/webrtc/modules/rtp_rtcp/source/nack_rtx_unittest.cc index 89fa3e67a..44251eae5 100644 --- a/webrtc/modules/rtp_rtcp/source/nack_rtx_unittest.cc +++ b/webrtc/modules/rtp_rtcp/source/nack_rtx_unittest.cc @@ -271,10 +271,12 @@ class RtpRtcpRtxNackTest : public ::testing::Test { timestamp / 90, payload_data, payload_data_length)); + // Min required delay until retransmit = 5 + RTT ms (RTT = 0). + fake_clock.AdvanceTimeMilliseconds(5); int length = BuildNackList(nack_list); if (length > 0) rtp_rtcp_module_->SendNACK(nack_list, length); - fake_clock.AdvanceTimeMilliseconds(33); + fake_clock.AdvanceTimeMilliseconds(28); // 33ms - 5ms delay. rtp_rtcp_module_->Process(); // Prepare next frame. timestamp += 3000; diff --git a/webrtc/modules/rtp_rtcp/source/rtp_packet_history.cc b/webrtc/modules/rtp_rtcp/source/rtp_packet_history.cc index 455f3dcbf..eeb1fc436 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_packet_history.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_packet_history.cc @@ -22,7 +22,8 @@ namespace webrtc { -enum { kMinPacketRequestBytes = 50 }; +static const int kMinPacketRequestBytes = 50; +static const size_t kMaxSize = 9600; // "Should be enough for anyone." RTPPacketHistory::RTPPacketHistory(Clock* clock) : clock_(clock), @@ -33,30 +34,26 @@ RTPPacketHistory::RTPPacketHistory(Clock* clock) } RTPPacketHistory::~RTPPacketHistory() { - { - CriticalSectionScoped cs(critsect_); - Free(); - } - delete critsect_; } void RTPPacketHistory::SetStorePacketsStatus(bool enable, uint16_t number_to_store) { - CriticalSectionScoped cs(critsect_); + CriticalSectionScoped cs(critsect_.get()); if (enable) { if (store_) { LOG(LS_WARNING) << "Purging packet history in order to re-set status."; Free(); } + assert(!store_); Allocate(number_to_store); } else { Free(); } } -void RTPPacketHistory::Allocate(uint16_t number_to_store) { +void RTPPacketHistory::Allocate(size_t number_to_store) { assert(number_to_store > 0); - assert(!store_); + assert(number_to_store <= kMaxSize); store_ = true; stored_packets_.resize(number_to_store); stored_seq_nums_.resize(number_to_store); @@ -89,26 +86,30 @@ void RTPPacketHistory::Free() { } bool RTPPacketHistory::StorePackets() const { - CriticalSectionScoped cs(critsect_); + CriticalSectionScoped cs(critsect_.get()); return store_; } -// private, lock should already be taken -void RTPPacketHistory::VerifyAndAllocatePacketLength(size_t packet_length) { +void RTPPacketHistory::VerifyAndAllocatePacketLength(size_t packet_length, + uint32_t start_index) { assert(packet_length > 0); if (!store_) { return; } - if (packet_length <= max_packet_length_) { + // If start_index > 0 this is a resize and we must check any new (empty) + // packets created during the resize. + if (start_index == 0 && packet_length <= max_packet_length_) { return; } + max_packet_length_ = std::max(packet_length, max_packet_length_); + std::vector >::iterator it; - for (it = stored_packets_.begin(); it != stored_packets_.end(); ++it) { - it->resize(packet_length); + for (it = stored_packets_.begin() + start_index; it != stored_packets_.end(); + ++it) { + it->resize(max_packet_length_); } - max_packet_length_ = packet_length; } int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet, @@ -120,7 +121,7 @@ int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet, return 0; } - CriticalSectionScoped cs(critsect_); + CriticalSectionScoped cs(critsect_.get()); if (!store_) { return 0; } @@ -128,7 +129,7 @@ int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet, assert(packet); assert(packet_length > 3); - VerifyAndAllocatePacketLength(max_packet_length); + VerifyAndAllocatePacketLength(max_packet_length, 0); if (packet_length > max_packet_length_) { LOG(LS_WARNING) << "Failed to store RTP packet with length: " @@ -138,9 +139,26 @@ int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet, const uint16_t seq_num = (packet[2] << 8) + packet[3]; + // If index we're about to overwrite contains a packet that has not + // yet been sent (probably pending in paced sender), we need to expand + // the buffer. + if (stored_lengths_[prev_index_] > 0 && + stored_send_times_[prev_index_] == 0) { + size_t current_size = static_cast(stored_packets_.size()); + size_t expanded_size = std::max(current_size * 3 / 2, current_size + 1); + expanded_size = std::min(expanded_size, kMaxSize); + Allocate(expanded_size); + VerifyAndAllocatePacketLength(max_packet_length, current_size); + // Causes discontinuity, but that's OK-ish. FindSeqNum() will still work, + // but may be slower - at least until buffer has wrapped around once. + prev_index_ = current_size; + } + // Store packet std::vector >::iterator it = stored_packets_.begin() + prev_index_; + // TODO(sprang): Overhaul this class and get rid of this copy step. + // (Finally introduce the RtpPacket class?) std::copy(packet, packet + packet_length, it->begin()); stored_seq_nums_[prev_index_] = seq_num; @@ -158,7 +176,7 @@ int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet, } bool RTPPacketHistory::HasRTPPacket(uint16_t sequence_number) const { - CriticalSectionScoped cs(critsect_); + CriticalSectionScoped cs(critsect_.get()); if (!store_) { return false; } @@ -177,14 +195,35 @@ bool RTPPacketHistory::HasRTPPacket(uint16_t sequence_number) const { return true; } +bool RTPPacketHistory::SetSent(uint16_t sequence_number) { + CriticalSectionScoped cs(critsect_.get()); + if (!store_) { + return false; + } + + int32_t index = 0; + bool found = FindSeqNum(sequence_number, &index); + if (!found) { + return false; + } + + // Send time already set. + if (stored_send_times_[index] != 0) { + return false; + } + + stored_send_times_[index] = clock_->TimeInMilliseconds(); + return true; +} + bool RTPPacketHistory::GetPacketAndSetSendTime(uint16_t sequence_number, int64_t min_elapsed_time_ms, bool retransmit, uint8_t* packet, size_t* packet_length, int64_t* stored_time_ms) { + CriticalSectionScoped cs(critsect_.get()); assert(*packet_length >= max_packet_length_); - CriticalSectionScoped cs(critsect_); if (!store_) { return false; } @@ -238,7 +277,7 @@ void RTPPacketHistory::GetPacket(int index, bool RTPPacketHistory::GetBestFittingPacket(uint8_t* packet, size_t* packet_length, int64_t* stored_time_ms) { - CriticalSectionScoped cs(critsect_); + CriticalSectionScoped cs(critsect_.get()); if (!store_) return false; int index = FindBestFittingPacket(*packet_length); diff --git a/webrtc/modules/rtp_rtcp/source/rtp_packet_history.h b/webrtc/modules/rtp_rtcp/source/rtp_packet_history.h index 9d5812540..b88d9d394 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_packet_history.h +++ b/webrtc/modules/rtp_rtcp/source/rtp_packet_history.h @@ -64,28 +64,36 @@ class RTPPacketHistory { bool HasRTPPacket(uint16_t sequence_number) const; + bool SetSent(uint16_t sequence_number); + private: - void GetPacket(int index, uint8_t* packet, size_t* packet_length, - int64_t* stored_time_ms) const; - void Allocate(uint16_t number_to_store) EXCLUSIVE_LOCKS_REQUIRED(*critsect_); + void GetPacket(int index, + uint8_t* packet, + size_t* packet_length, + int64_t* stored_time_ms) const + EXCLUSIVE_LOCKS_REQUIRED(*critsect_); + void Allocate(size_t number_to_store) EXCLUSIVE_LOCKS_REQUIRED(*critsect_); void Free() EXCLUSIVE_LOCKS_REQUIRED(*critsect_); - void VerifyAndAllocatePacketLength(size_t packet_length); - bool FindSeqNum(uint16_t sequence_number, int32_t* index) const; - int FindBestFittingPacket(size_t size) const; + void VerifyAndAllocatePacketLength(size_t packet_length, uint32_t start_index) + EXCLUSIVE_LOCKS_REQUIRED(*critsect_); + bool FindSeqNum(uint16_t sequence_number, int32_t* index) const + EXCLUSIVE_LOCKS_REQUIRED(*critsect_); + int FindBestFittingPacket(size_t size) const + EXCLUSIVE_LOCKS_REQUIRED(*critsect_); private: Clock* clock_; - CriticalSectionWrapper* critsect_; - bool store_; - uint32_t prev_index_; - size_t max_packet_length_; + scoped_ptr critsect_; + bool store_ GUARDED_BY(critsect_); + uint32_t prev_index_ GUARDED_BY(critsect_); + size_t max_packet_length_ GUARDED_BY(critsect_); - std::vector > stored_packets_; - std::vector stored_seq_nums_; - std::vector stored_lengths_; - std::vector stored_times_; - std::vector stored_send_times_; - std::vector stored_types_; + std::vector > stored_packets_ GUARDED_BY(critsect_); + std::vector stored_seq_nums_ GUARDED_BY(critsect_); + std::vector stored_lengths_ GUARDED_BY(critsect_); + std::vector stored_times_ GUARDED_BY(critsect_); + std::vector stored_send_times_ GUARDED_BY(critsect_); + std::vector stored_types_ GUARDED_BY(critsect_); }; } // namespace webrtc #endif // WEBRTC_MODULES_RTP_RTCP_RTP_PACKET_HISTORY_H_ diff --git a/webrtc/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc b/webrtc/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc index 76798b682..2d9d30698 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc @@ -203,4 +203,49 @@ TEST_F(RtpPacketHistoryTest, MinResendTime) { EXPECT_FALSE(hist_->GetPacketAndSetSendTime(kSeqNum, 101, false, packet_, &len, &time)); } + +TEST_F(RtpPacketHistoryTest, DynamicExpansion) { + hist_->SetStorePacketsStatus(true, 10); + size_t len; + int64_t capture_time_ms = fake_clock_.TimeInMilliseconds(); + int64_t time; + + // Add 4 packets, and then send them. + for (int i = 0; i < 4; ++i) { + len = 0; + CreateRtpPacket(kSeqNum + i, kSsrc, kPayload, kTimestamp, packet_, &len); + EXPECT_EQ(0, hist_->PutRTPPacket(packet_, len, kMaxPacketLength, + capture_time_ms, kAllowRetransmission)); + } + for (int i = 0; i < 4; ++i) { + len = kMaxPacketLength; + EXPECT_TRUE(hist_->GetPacketAndSetSendTime(kSeqNum + i, 100, false, packet_, + &len, &time)); + } + capture_time_ms += 33; + + // Add 16 packets, and then send them. History should expand to make this + // work. + for (int i = 4; i < 20; ++i) { + len = 0; + CreateRtpPacket(kSeqNum + i, kSsrc, kPayload, kTimestamp, packet_, &len); + EXPECT_EQ(0, hist_->PutRTPPacket(packet_, len, kMaxPacketLength, + capture_time_ms, kAllowRetransmission)); + } + for (int i = 4; i < 20; ++i) { + len = kMaxPacketLength; + EXPECT_TRUE(hist_->GetPacketAndSetSendTime(kSeqNum + i, 100, false, packet_, + &len, &time)); + } + + fake_clock_.AdvanceTimeMilliseconds(100); + + // Retransmit last 16 packets. + for (int i = 4; i < 20; ++i) { + len = kMaxPacketLength; + EXPECT_TRUE(hist_->GetPacketAndSetSendTime(kSeqNum + i, 100, false, packet_, + &len, &time)); + } +} + } // namespace webrtc diff --git a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl_unittest.cc b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl_unittest.cc index 0efc5bd75..0a3a9d7b6 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl_unittest.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl_unittest.cc @@ -221,6 +221,9 @@ TEST_F(RtpRtcpImplTest, SetSelectiveRetransmissions_BaseLayer) { EXPECT_EQ(3, sender_.RtpSent()); EXPECT_EQ(kSequenceNumber + 2, sender_.LastRtpSequenceNumber()); + // Min required delay until retransmit = 5 + RTT ms (RTT = 0). + clock_.AdvanceTimeMilliseconds(5); + // Frame with kBaseLayerTid re-sent. IncomingRtcpNack(&sender_, kSequenceNumber); EXPECT_EQ(4, sender_.RtpSent()); @@ -247,6 +250,9 @@ TEST_F(RtpRtcpImplTest, SetSelectiveRetransmissions_HigherLayers) { EXPECT_EQ(3, sender_.RtpSent()); EXPECT_EQ(kSequenceNumber + 2, sender_.LastRtpSequenceNumber()); + // Min required delay until retransmit = 5 + RTT ms (RTT = 0). + clock_.AdvanceTimeMilliseconds(5); + // Frame with kBaseLayerTid re-sent. IncomingRtcpNack(&sender_, kSequenceNumber); EXPECT_EQ(4, sender_.RtpSent()); diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc index d7cc0bc84..6ea11c23e 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc @@ -1001,6 +1001,7 @@ int32_t RTPSender::SendToNetwork( if (capture_time_ms > 0) { UpdateDelayStatistics(capture_time_ms, now_ms); } + size_t length = payload_length + rtp_header_length; if (!SendPacketToNetwork(buffer, length)) return -1; @@ -1008,6 +1009,7 @@ int32_t RTPSender::SendToNetwork( CriticalSectionScoped lock(send_critsect_); media_has_been_sent_ = true; } + packet_history_.SetSent(rtp_header.sequenceNumber); UpdateRtpStats(buffer, length, rtp_header, false, false); return 0; }