Ensure that no packet stays in the pacer queue for longer than 2 seconds.

BUG=2682
TEST=trybots
R=mflodman@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/4519004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@5182 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
stefan@webrtc.org 2013-11-27 14:16:20 +00:00
parent b3ea3afa60
commit 19a40ff05b
4 changed files with 126 additions and 59 deletions

View File

@ -52,6 +52,9 @@ class PacedSender : public Module {
protected:
virtual ~Callback() {}
};
static const int kDefaultMaxQueueLengthMs = 2000;
PacedSender(Callback* callback, int target_bitrate_kbps,
float pace_multiplier);
@ -85,6 +88,10 @@ class PacedSender : public Module {
int bytes,
bool retransmission);
// Sets the max length of the pacer queue in milliseconds.
// A negative queue size is interpreted as infinite.
virtual void set_max_queue_length_ms(int max_queue_length_ms);
// Returns the time since the oldest queued packet was captured.
virtual int QueueInMs() const;
@ -105,6 +112,8 @@ class PacedSender : public Module {
uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms,
bool* retransmission);
bool SendPacketFromList(paced_sender::PacketList* packet_list);
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBytesPerInterval(uint32_t delta_time_in_ms);
@ -115,6 +124,7 @@ class PacedSender : public Module {
const float pace_multiplier_;
bool enabled_;
bool paused_;
int max_queue_length_ms_;
scoped_ptr<CriticalSectionWrapper> critsect_;
// This is the media budget, keeping track of how many bits of media
// we can pace out during the current interval.

View File

@ -124,6 +124,7 @@ PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps,
pace_multiplier_(pace_multiplier),
enabled_(false),
paused_(false),
max_queue_length_ms_(kDefaultMaxQueueLengthMs),
critsect_(CriticalSectionWrapper::CreateCriticalSection()),
media_budget_(new paced_sender::IntervalBudget(
pace_multiplier_ * target_bitrate_kbps)),
@ -206,6 +207,11 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
return false;
}
void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
CriticalSectionScoped cs(critsect_.get());
max_queue_length_ms_ = max_queue_length_ms;
}
int PacedSender::QueueInMs() const {
CriticalSectionScoped cs(critsect_.get());
int64_t now_ms = TickTime::MillisecondTimestamp();
@ -254,36 +260,10 @@ int32_t PacedSender::Process() {
uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
UpdateBytesPerInterval(delta_time_ms);
}
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms;
bool retransmission;
paced_sender::PacketList* packet_list;
while (ShouldSendNextPacket(&packet_list)) {
GetNextPacketFromList(packet_list, &ssrc, &sequence_number,
&capture_time_ms, &retransmission);
critsect_->Leave();
const bool success = callback_->TimeToSendPacket(ssrc, sequence_number,
capture_time_ms,
retransmission);
critsect_->Enter();
// If packet cannot be sent then keep it in packet list and exit early.
// There's no need to send more packets.
if (!success) {
if (!SendPacketFromList(packet_list))
return 0;
}
packet_list->pop_front();
const bool last_packet = packet_list->empty() ||
packet_list->front().capture_time_ms_ > capture_time_ms;
if (packet_list != high_priority_packets_.get()) {
if (capture_time_ms > capture_time_ms_last_sent_) {
capture_time_ms_last_sent_ = capture_time_ms;
} else if (capture_time_ms == capture_time_ms_last_sent_ &&
last_packet) {
TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms);
}
}
}
if (high_priority_packets_->empty() &&
normal_priority_packets_->empty() &&
@ -304,6 +284,39 @@ int32_t PacedSender::Process() {
return 0;
}
// MUST have critsect_ when calling.
bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) {
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms;
bool retransmission;
GetNextPacketFromList(packet_list, &ssrc, &sequence_number,
&capture_time_ms, &retransmission);
critsect_->Leave();
const bool success = callback_->TimeToSendPacket(ssrc, sequence_number,
capture_time_ms,
retransmission);
critsect_->Enter();
// If packet cannot be sent then keep it in packet list and exit early.
// There's no need to send more packets.
if (!success) {
return false;
}
packet_list->pop_front();
const bool last_packet = packet_list->empty() ||
packet_list->front().capture_time_ms_ > capture_time_ms;
if (packet_list != high_priority_packets_.get()) {
if (capture_time_ms > capture_time_ms_last_sent_) {
capture_time_ms_last_sent_ = capture_time_ms;
} else if (capture_time_ms == capture_time_ms_last_sent_ &&
last_packet) {
TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms);
}
}
return true;
}
// MUST have critsect_ when calling.
void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
media_budget_->IncreaseBudget(delta_time_ms);
@ -327,6 +340,21 @@ bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
return true;
}
}
// Send any old packets to avoid queuing for too long.
if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
int64_t high_priority_capture_time = -1;
if (!high_priority_packets_->empty()) {
high_priority_capture_time =
high_priority_packets_->front().capture_time_ms_;
*packet_list = high_priority_packets_.get();
}
if (!normal_priority_packets_->empty() && high_priority_capture_time >
normal_priority_packets_->front().capture_time_ms_) {
*packet_list = normal_priority_packets_.get();
}
if (*packet_list)
return true;
}
return false;
}
if (!high_priority_packets_->empty()) {

View File

@ -83,53 +83,50 @@ class PacedSenderTest : public ::testing::Test {
TEST_F(PacedSenderTest, QueuePacket) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
// Due to the multiplicative factor we can send 3 packets not 2 packets.
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
int64_t queued_packet_timestamp = TickTime::MillisecondTimestamp();
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number, capture_time_ms, 250, false));
sequence_number, queued_packet_timestamp, 250, false));
send_bucket_->Process();
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false)).Times(0);
TickTime::AdvanceFakeClock(4);
EXPECT_EQ(1, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(1);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_CALL(callback_, TimeToSendPacket(
ssrc, sequence_number++, capture_time_ms, false))
ssrc, sequence_number++, queued_packet_timestamp, false))
.Times(1)
.WillRepeatedly(Return(true));
send_bucket_->Process();
sequence_number++;
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250, false));
sequence_number++, TickTime::MillisecondTimestamp(), 250, false));
send_bucket_->Process();
}
TEST_F(PacedSenderTest, PaceQueuedPackets) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
// Due to the multiplicative factor we can send 3 packets not 2 packets.
for (int i = 0; i < 3; ++i) {
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
}
for (int j = 0; j < 30; ++j) {
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250, false));
sequence_number++, TickTime::MillisecondTimestamp(), 250, false));
}
send_bucket_->Process();
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
@ -137,7 +134,7 @@ TEST_F(PacedSenderTest, PaceQueuedPackets) {
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, _, capture_time_ms, false))
TimeToSendPacket(ssrc, _, _, false))
.Times(3)
.WillRepeatedly(Return(true));
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
@ -148,35 +145,34 @@ TEST_F(PacedSenderTest, PaceQueuedPackets) {
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number, capture_time_ms, 250, false));
sequence_number, TickTime::MillisecondTimestamp(), 250, false));
send_bucket_->Process();
}
TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
uint16_t queued_sequence_number;
// Due to the multiplicative factor we can send 3 packets not 2 packets.
for (int i = 0; i < 3; ++i) {
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
}
queued_sequence_number = sequence_number;
for (int j = 0; j < 30; ++j) {
// Send in duplicate packets.
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number, capture_time_ms, 250, false));
sequence_number, TickTime::MillisecondTimestamp(), 250, false));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250, false));
sequence_number++, TickTime::MillisecondTimestamp(), 250, false));
}
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
send_bucket_->Process();
@ -186,7 +182,8 @@ TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) {
for (int i = 0; i < 3; ++i) {
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, queued_sequence_number++,
capture_time_ms, false))
_,
false))
.Times(1)
.WillRepeatedly(Return(true));
}
@ -198,29 +195,28 @@ TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) {
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250, false));
sequence_number++, TickTime::MillisecondTimestamp(), 250, false));
send_bucket_->Process();
}
TEST_F(PacedSenderTest, Padding) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
send_bucket_->UpdateBitrate(kTargetBitrate, kTargetBitrate, kTargetBitrate);
// Due to the multiplicative factor we can send 3 packets not 2 packets.
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
capture_time_ms, 250, false);
TickTime::MillisecondTimestamp(), 250, false);
// No padding is expected since we have sent too much already.
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
@ -492,5 +488,33 @@ TEST_F(PacedSenderTest, ResendPacket) {
EXPECT_EQ(0, send_bucket_->QueueInMs());
}
TEST_F(PacedSenderTest, MaxQueueLength) {
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
EXPECT_EQ(0, send_bucket_->QueueInMs());
send_bucket_->UpdateBitrate(30, 0, 0);
for (int i = 0; i < 30; ++i) {
SendAndExpectPacket(PacedSender::kNormalPriority,
ssrc,
sequence_number++,
TickTime::MillisecondTimestamp(),
1200,
false);
}
TickTime::AdvanceFakeClock(2001);
SendAndExpectPacket(PacedSender::kNormalPriority,
ssrc,
sequence_number++,
TickTime::MillisecondTimestamp(),
1200,
false);
EXPECT_EQ(2001, send_bucket_->QueueInMs());
send_bucket_->Process();
EXPECT_EQ(0, send_bucket_->QueueInMs());
TickTime::AdvanceFakeClock(31);
send_bucket_->Process();
}
} // namespace test
} // namespace webrtc

View File

@ -846,10 +846,15 @@ void ViEEncoder::SetSenderBufferingMode(int target_delay_ms) {
// Disable external frame-droppers.
vcm_.EnableFrameDropper(false);
vpm_.EnableTemporalDecimation(false);
// We don't put any limits on the pacer queue when running in buffered mode
// since the encoder will be paused if the queue grow too large.
paced_sender_->set_max_queue_length_ms(-1);
} else {
// Real-time mode - enable frame droppers.
vpm_.EnableTemporalDecimation(true);
vcm_.EnableFrameDropper(true);
paced_sender_->set_max_queue_length_ms(
PacedSender::kDefaultMaxQueueLengthMs);
}
}