Allow rtp packet history to dynamically expand in size.

When using the paced sender, packets will be put into the rtp packet
history and then retreived from there again when it is time to send.

In some cases (low send bitrate and very large frames created) this
may overflow, causing packets to be overwritten in the packet history
before they have been sent.

Check this condition and expand history size if needed.

This is primarily triggered during screenshare, when
switching to a large picture with lots of high frequency
details in it.

BUG=4171
R=stefan@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/34879004

Cr-Commit-Position: refs/heads/master@{#8195}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8195 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
sprang@webrtc.org 2015-01-29 09:09:17 +00:00
parent 827d7e806a
commit 43c883954f
6 changed files with 140 additions and 38 deletions

View File

@ -271,10 +271,12 @@ class RtpRtcpRtxNackTest : public ::testing::Test {
timestamp / 90, timestamp / 90,
payload_data, payload_data,
payload_data_length)); payload_data_length));
// Min required delay until retransmit = 5 + RTT ms (RTT = 0).
fake_clock.AdvanceTimeMilliseconds(5);
int length = BuildNackList(nack_list); int length = BuildNackList(nack_list);
if (length > 0) if (length > 0)
rtp_rtcp_module_->SendNACK(nack_list, length); rtp_rtcp_module_->SendNACK(nack_list, length);
fake_clock.AdvanceTimeMilliseconds(33); fake_clock.AdvanceTimeMilliseconds(28); // 33ms - 5ms delay.
rtp_rtcp_module_->Process(); rtp_rtcp_module_->Process();
// Prepare next frame. // Prepare next frame.
timestamp += 3000; timestamp += 3000;

View File

@ -22,7 +22,8 @@
namespace webrtc { namespace webrtc {
enum { kMinPacketRequestBytes = 50 }; static const int kMinPacketRequestBytes = 50;
static const size_t kMaxSize = 9600; // "Should be enough for anyone."
RTPPacketHistory::RTPPacketHistory(Clock* clock) RTPPacketHistory::RTPPacketHistory(Clock* clock)
: clock_(clock), : clock_(clock),
@ -33,30 +34,26 @@ RTPPacketHistory::RTPPacketHistory(Clock* clock)
} }
RTPPacketHistory::~RTPPacketHistory() { RTPPacketHistory::~RTPPacketHistory() {
{
CriticalSectionScoped cs(critsect_);
Free();
}
delete critsect_;
} }
void RTPPacketHistory::SetStorePacketsStatus(bool enable, void RTPPacketHistory::SetStorePacketsStatus(bool enable,
uint16_t number_to_store) { uint16_t number_to_store) {
CriticalSectionScoped cs(critsect_); CriticalSectionScoped cs(critsect_.get());
if (enable) { if (enable) {
if (store_) { if (store_) {
LOG(LS_WARNING) << "Purging packet history in order to re-set status."; LOG(LS_WARNING) << "Purging packet history in order to re-set status.";
Free(); Free();
} }
assert(!store_);
Allocate(number_to_store); Allocate(number_to_store);
} else { } else {
Free(); Free();
} }
} }
void RTPPacketHistory::Allocate(uint16_t number_to_store) { void RTPPacketHistory::Allocate(size_t number_to_store) {
assert(number_to_store > 0); assert(number_to_store > 0);
assert(!store_); assert(number_to_store <= kMaxSize);
store_ = true; store_ = true;
stored_packets_.resize(number_to_store); stored_packets_.resize(number_to_store);
stored_seq_nums_.resize(number_to_store); stored_seq_nums_.resize(number_to_store);
@ -89,26 +86,30 @@ void RTPPacketHistory::Free() {
} }
bool RTPPacketHistory::StorePackets() const { bool RTPPacketHistory::StorePackets() const {
CriticalSectionScoped cs(critsect_); CriticalSectionScoped cs(critsect_.get());
return store_; return store_;
} }
// private, lock should already be taken void RTPPacketHistory::VerifyAndAllocatePacketLength(size_t packet_length,
void RTPPacketHistory::VerifyAndAllocatePacketLength(size_t packet_length) { uint32_t start_index) {
assert(packet_length > 0); assert(packet_length > 0);
if (!store_) { if (!store_) {
return; return;
} }
if (packet_length <= max_packet_length_) { // If start_index > 0 this is a resize and we must check any new (empty)
// packets created during the resize.
if (start_index == 0 && packet_length <= max_packet_length_) {
return; return;
} }
max_packet_length_ = std::max(packet_length, max_packet_length_);
std::vector<std::vector<uint8_t> >::iterator it; std::vector<std::vector<uint8_t> >::iterator it;
for (it = stored_packets_.begin(); it != stored_packets_.end(); ++it) { for (it = stored_packets_.begin() + start_index; it != stored_packets_.end();
it->resize(packet_length); ++it) {
it->resize(max_packet_length_);
} }
max_packet_length_ = packet_length;
} }
int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet, int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet,
@ -120,7 +121,7 @@ int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet,
return 0; return 0;
} }
CriticalSectionScoped cs(critsect_); CriticalSectionScoped cs(critsect_.get());
if (!store_) { if (!store_) {
return 0; return 0;
} }
@ -128,7 +129,7 @@ int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet,
assert(packet); assert(packet);
assert(packet_length > 3); assert(packet_length > 3);
VerifyAndAllocatePacketLength(max_packet_length); VerifyAndAllocatePacketLength(max_packet_length, 0);
if (packet_length > max_packet_length_) { if (packet_length > max_packet_length_) {
LOG(LS_WARNING) << "Failed to store RTP packet with length: " LOG(LS_WARNING) << "Failed to store RTP packet with length: "
@ -138,9 +139,26 @@ int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet,
const uint16_t seq_num = (packet[2] << 8) + packet[3]; const uint16_t seq_num = (packet[2] << 8) + packet[3];
// If index we're about to overwrite contains a packet that has not
// yet been sent (probably pending in paced sender), we need to expand
// the buffer.
if (stored_lengths_[prev_index_] > 0 &&
stored_send_times_[prev_index_] == 0) {
size_t current_size = static_cast<uint16_t>(stored_packets_.size());
size_t expanded_size = std::max(current_size * 3 / 2, current_size + 1);
expanded_size = std::min(expanded_size, kMaxSize);
Allocate(expanded_size);
VerifyAndAllocatePacketLength(max_packet_length, current_size);
// Causes discontinuity, but that's OK-ish. FindSeqNum() will still work,
// but may be slower - at least until buffer has wrapped around once.
prev_index_ = current_size;
}
// Store packet // Store packet
std::vector<std::vector<uint8_t> >::iterator it = std::vector<std::vector<uint8_t> >::iterator it =
stored_packets_.begin() + prev_index_; stored_packets_.begin() + prev_index_;
// TODO(sprang): Overhaul this class and get rid of this copy step.
// (Finally introduce the RtpPacket class?)
std::copy(packet, packet + packet_length, it->begin()); std::copy(packet, packet + packet_length, it->begin());
stored_seq_nums_[prev_index_] = seq_num; stored_seq_nums_[prev_index_] = seq_num;
@ -158,7 +176,7 @@ int32_t RTPPacketHistory::PutRTPPacket(const uint8_t* packet,
} }
bool RTPPacketHistory::HasRTPPacket(uint16_t sequence_number) const { bool RTPPacketHistory::HasRTPPacket(uint16_t sequence_number) const {
CriticalSectionScoped cs(critsect_); CriticalSectionScoped cs(critsect_.get());
if (!store_) { if (!store_) {
return false; return false;
} }
@ -177,14 +195,35 @@ bool RTPPacketHistory::HasRTPPacket(uint16_t sequence_number) const {
return true; return true;
} }
bool RTPPacketHistory::SetSent(uint16_t sequence_number) {
CriticalSectionScoped cs(critsect_.get());
if (!store_) {
return false;
}
int32_t index = 0;
bool found = FindSeqNum(sequence_number, &index);
if (!found) {
return false;
}
// Send time already set.
if (stored_send_times_[index] != 0) {
return false;
}
stored_send_times_[index] = clock_->TimeInMilliseconds();
return true;
}
bool RTPPacketHistory::GetPacketAndSetSendTime(uint16_t sequence_number, bool RTPPacketHistory::GetPacketAndSetSendTime(uint16_t sequence_number,
int64_t min_elapsed_time_ms, int64_t min_elapsed_time_ms,
bool retransmit, bool retransmit,
uint8_t* packet, uint8_t* packet,
size_t* packet_length, size_t* packet_length,
int64_t* stored_time_ms) { int64_t* stored_time_ms) {
CriticalSectionScoped cs(critsect_.get());
assert(*packet_length >= max_packet_length_); assert(*packet_length >= max_packet_length_);
CriticalSectionScoped cs(critsect_);
if (!store_) { if (!store_) {
return false; return false;
} }
@ -238,7 +277,7 @@ void RTPPacketHistory::GetPacket(int index,
bool RTPPacketHistory::GetBestFittingPacket(uint8_t* packet, bool RTPPacketHistory::GetBestFittingPacket(uint8_t* packet,
size_t* packet_length, size_t* packet_length,
int64_t* stored_time_ms) { int64_t* stored_time_ms) {
CriticalSectionScoped cs(critsect_); CriticalSectionScoped cs(critsect_.get());
if (!store_) if (!store_)
return false; return false;
int index = FindBestFittingPacket(*packet_length); int index = FindBestFittingPacket(*packet_length);

View File

@ -64,28 +64,36 @@ class RTPPacketHistory {
bool HasRTPPacket(uint16_t sequence_number) const; bool HasRTPPacket(uint16_t sequence_number) const;
bool SetSent(uint16_t sequence_number);
private: private:
void GetPacket(int index, uint8_t* packet, size_t* packet_length, void GetPacket(int index,
int64_t* stored_time_ms) const; uint8_t* packet,
void Allocate(uint16_t number_to_store) EXCLUSIVE_LOCKS_REQUIRED(*critsect_); size_t* packet_length,
int64_t* stored_time_ms) const
EXCLUSIVE_LOCKS_REQUIRED(*critsect_);
void Allocate(size_t number_to_store) EXCLUSIVE_LOCKS_REQUIRED(*critsect_);
void Free() EXCLUSIVE_LOCKS_REQUIRED(*critsect_); void Free() EXCLUSIVE_LOCKS_REQUIRED(*critsect_);
void VerifyAndAllocatePacketLength(size_t packet_length); void VerifyAndAllocatePacketLength(size_t packet_length, uint32_t start_index)
bool FindSeqNum(uint16_t sequence_number, int32_t* index) const; EXCLUSIVE_LOCKS_REQUIRED(*critsect_);
int FindBestFittingPacket(size_t size) const; bool FindSeqNum(uint16_t sequence_number, int32_t* index) const
EXCLUSIVE_LOCKS_REQUIRED(*critsect_);
int FindBestFittingPacket(size_t size) const
EXCLUSIVE_LOCKS_REQUIRED(*critsect_);
private: private:
Clock* clock_; Clock* clock_;
CriticalSectionWrapper* critsect_; scoped_ptr<CriticalSectionWrapper> critsect_;
bool store_; bool store_ GUARDED_BY(critsect_);
uint32_t prev_index_; uint32_t prev_index_ GUARDED_BY(critsect_);
size_t max_packet_length_; size_t max_packet_length_ GUARDED_BY(critsect_);
std::vector<std::vector<uint8_t> > stored_packets_; std::vector<std::vector<uint8_t> > stored_packets_ GUARDED_BY(critsect_);
std::vector<uint16_t> stored_seq_nums_; std::vector<uint16_t> stored_seq_nums_ GUARDED_BY(critsect_);
std::vector<size_t> stored_lengths_; std::vector<size_t> stored_lengths_ GUARDED_BY(critsect_);
std::vector<int64_t> stored_times_; std::vector<int64_t> stored_times_ GUARDED_BY(critsect_);
std::vector<int64_t> stored_send_times_; std::vector<int64_t> stored_send_times_ GUARDED_BY(critsect_);
std::vector<StorageType> stored_types_; std::vector<StorageType> stored_types_ GUARDED_BY(critsect_);
}; };
} // namespace webrtc } // namespace webrtc
#endif // WEBRTC_MODULES_RTP_RTCP_RTP_PACKET_HISTORY_H_ #endif // WEBRTC_MODULES_RTP_RTCP_RTP_PACKET_HISTORY_H_

View File

@ -203,4 +203,49 @@ TEST_F(RtpPacketHistoryTest, MinResendTime) {
EXPECT_FALSE(hist_->GetPacketAndSetSendTime(kSeqNum, 101, false, packet_, EXPECT_FALSE(hist_->GetPacketAndSetSendTime(kSeqNum, 101, false, packet_,
&len, &time)); &len, &time));
} }
TEST_F(RtpPacketHistoryTest, DynamicExpansion) {
hist_->SetStorePacketsStatus(true, 10);
size_t len;
int64_t capture_time_ms = fake_clock_.TimeInMilliseconds();
int64_t time;
// Add 4 packets, and then send them.
for (int i = 0; i < 4; ++i) {
len = 0;
CreateRtpPacket(kSeqNum + i, kSsrc, kPayload, kTimestamp, packet_, &len);
EXPECT_EQ(0, hist_->PutRTPPacket(packet_, len, kMaxPacketLength,
capture_time_ms, kAllowRetransmission));
}
for (int i = 0; i < 4; ++i) {
len = kMaxPacketLength;
EXPECT_TRUE(hist_->GetPacketAndSetSendTime(kSeqNum + i, 100, false, packet_,
&len, &time));
}
capture_time_ms += 33;
// Add 16 packets, and then send them. History should expand to make this
// work.
for (int i = 4; i < 20; ++i) {
len = 0;
CreateRtpPacket(kSeqNum + i, kSsrc, kPayload, kTimestamp, packet_, &len);
EXPECT_EQ(0, hist_->PutRTPPacket(packet_, len, kMaxPacketLength,
capture_time_ms, kAllowRetransmission));
}
for (int i = 4; i < 20; ++i) {
len = kMaxPacketLength;
EXPECT_TRUE(hist_->GetPacketAndSetSendTime(kSeqNum + i, 100, false, packet_,
&len, &time));
}
fake_clock_.AdvanceTimeMilliseconds(100);
// Retransmit last 16 packets.
for (int i = 4; i < 20; ++i) {
len = kMaxPacketLength;
EXPECT_TRUE(hist_->GetPacketAndSetSendTime(kSeqNum + i, 100, false, packet_,
&len, &time));
}
}
} // namespace webrtc } // namespace webrtc

View File

@ -221,6 +221,9 @@ TEST_F(RtpRtcpImplTest, SetSelectiveRetransmissions_BaseLayer) {
EXPECT_EQ(3, sender_.RtpSent()); EXPECT_EQ(3, sender_.RtpSent());
EXPECT_EQ(kSequenceNumber + 2, sender_.LastRtpSequenceNumber()); EXPECT_EQ(kSequenceNumber + 2, sender_.LastRtpSequenceNumber());
// Min required delay until retransmit = 5 + RTT ms (RTT = 0).
clock_.AdvanceTimeMilliseconds(5);
// Frame with kBaseLayerTid re-sent. // Frame with kBaseLayerTid re-sent.
IncomingRtcpNack(&sender_, kSequenceNumber); IncomingRtcpNack(&sender_, kSequenceNumber);
EXPECT_EQ(4, sender_.RtpSent()); EXPECT_EQ(4, sender_.RtpSent());
@ -247,6 +250,9 @@ TEST_F(RtpRtcpImplTest, SetSelectiveRetransmissions_HigherLayers) {
EXPECT_EQ(3, sender_.RtpSent()); EXPECT_EQ(3, sender_.RtpSent());
EXPECT_EQ(kSequenceNumber + 2, sender_.LastRtpSequenceNumber()); EXPECT_EQ(kSequenceNumber + 2, sender_.LastRtpSequenceNumber());
// Min required delay until retransmit = 5 + RTT ms (RTT = 0).
clock_.AdvanceTimeMilliseconds(5);
// Frame with kBaseLayerTid re-sent. // Frame with kBaseLayerTid re-sent.
IncomingRtcpNack(&sender_, kSequenceNumber); IncomingRtcpNack(&sender_, kSequenceNumber);
EXPECT_EQ(4, sender_.RtpSent()); EXPECT_EQ(4, sender_.RtpSent());

View File

@ -1001,6 +1001,7 @@ int32_t RTPSender::SendToNetwork(
if (capture_time_ms > 0) { if (capture_time_ms > 0) {
UpdateDelayStatistics(capture_time_ms, now_ms); UpdateDelayStatistics(capture_time_ms, now_ms);
} }
size_t length = payload_length + rtp_header_length; size_t length = payload_length + rtp_header_length;
if (!SendPacketToNetwork(buffer, length)) if (!SendPacketToNetwork(buffer, length))
return -1; return -1;
@ -1008,6 +1009,7 @@ int32_t RTPSender::SendToNetwork(
CriticalSectionScoped lock(send_critsect_); CriticalSectionScoped lock(send_critsect_);
media_has_been_sent_ = true; media_has_been_sent_ = true;
} }
packet_history_.SetSent(rtp_header.sequenceNumber);
UpdateRtpStats(buffer, length, rtp_header, false, false); UpdateRtpStats(buffer, length, rtp_header, false, false);
return 0; return 0;
} }