Measure pacer queue size based on when packets are inserted rather than captured.

R=mflodman@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/5659004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@5291 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
stefan@webrtc.org 2013-12-13 22:03:27 +00:00
parent 167b6dfc73
commit dd393e7b9d
3 changed files with 64 additions and 46 deletions

View File

@ -92,7 +92,7 @@ class PacedSender : public Module {
// A negative queue size is interpreted as infinite. // A negative queue size is interpreted as infinite.
virtual void set_max_queue_length_ms(int max_queue_length_ms); virtual void set_max_queue_length_ms(int max_queue_length_ms);
// Returns the time since the oldest queued packet was captured. // Returns the time since the oldest queued packet was enqueued.
virtual int QueueInMs() const; virtual int QueueInMs() const;
// Returns the number of milliseconds until the module want a worker thread // Returns the number of milliseconds until the module want a worker thread
@ -108,9 +108,7 @@ class PacedSender : public Module {
bool ShouldSendNextPacket(paced_sender::PacketList** packet_list); bool ShouldSendNextPacket(paced_sender::PacketList** packet_list);
// Local helper function to GetNextPacket. // Local helper function to GetNextPacket.
void GetNextPacketFromList(paced_sender::PacketList* packets, paced_sender::Packet GetNextPacketFromList(paced_sender::PacketList* packets);
uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms,
bool* retransmission);
bool SendPacketFromList(paced_sender::PacketList* packet_list); bool SendPacketFromList(paced_sender::PacketList* packet_list);

View File

@ -36,16 +36,18 @@ namespace webrtc {
namespace paced_sender { namespace paced_sender {
struct Packet { struct Packet {
Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms, Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
int length_in_bytes, bool retransmission) int64_t enqueue_time_ms, int length_in_bytes, bool retransmission)
: ssrc_(ssrc), : ssrc_(ssrc),
sequence_number_(seq_number), sequence_number_(seq_number),
capture_time_ms_(capture_time_ms), capture_time_ms_(capture_time_ms),
enqueue_time_ms_(enqueue_time_ms),
bytes_(length_in_bytes), bytes_(length_in_bytes),
retransmission_(retransmission) { retransmission_(retransmission) {
} }
uint32_t ssrc_; uint32_t ssrc_;
uint16_t sequence_number_; uint16_t sequence_number_;
int64_t capture_time_ms_; int64_t capture_time_ms_;
int64_t enqueue_time_ms_;
int bytes_; int bytes_;
bool retransmission_; bool retransmission_;
}; };
@ -201,8 +203,11 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
packet_list = low_priority_packets_.get(); packet_list = low_priority_packets_.get();
break; break;
} }
packet_list->push_back(paced_sender::Packet(ssrc, sequence_number, packet_list->push_back(paced_sender::Packet(ssrc,
capture_time_ms, bytes, sequence_number,
capture_time_ms,
TickTime::MillisecondTimestamp(),
bytes,
retransmission)); retransmission));
return false; return false;
} }
@ -215,23 +220,23 @@ void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
int PacedSender::QueueInMs() const { int PacedSender::QueueInMs() const {
CriticalSectionScoped cs(critsect_.get()); CriticalSectionScoped cs(critsect_.get());
int64_t now_ms = TickTime::MillisecondTimestamp(); int64_t now_ms = TickTime::MillisecondTimestamp();
int64_t oldest_packet_capture_time = now_ms; int64_t oldest_packet_enqueue_time = now_ms;
if (!high_priority_packets_->empty()) { if (!high_priority_packets_->empty()) {
oldest_packet_capture_time = std::min( oldest_packet_enqueue_time = std::min(
oldest_packet_capture_time, oldest_packet_enqueue_time,
high_priority_packets_->front().capture_time_ms_); high_priority_packets_->front().enqueue_time_ms_);
} }
if (!normal_priority_packets_->empty()) { if (!normal_priority_packets_->empty()) {
oldest_packet_capture_time = std::min( oldest_packet_enqueue_time = std::min(
oldest_packet_capture_time, oldest_packet_enqueue_time,
normal_priority_packets_->front().capture_time_ms_); normal_priority_packets_->front().enqueue_time_ms_);
} }
if (!low_priority_packets_->empty()) { if (!low_priority_packets_->empty()) {
oldest_packet_capture_time = std::min( oldest_packet_enqueue_time = std::min(
oldest_packet_capture_time, oldest_packet_enqueue_time,
low_priority_packets_->front().capture_time_ms_); low_priority_packets_->front().enqueue_time_ms_);
} }
return now_ms - oldest_packet_capture_time; return now_ms - oldest_packet_enqueue_time;
} }
int32_t PacedSender::TimeUntilNextProcess() { int32_t PacedSender::TimeUntilNextProcess() {
@ -286,17 +291,13 @@ int32_t PacedSender::Process() {
// MUST have critsect_ when calling. // MUST have critsect_ when calling.
bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) { bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) {
uint32_t ssrc; paced_sender::Packet packet = GetNextPacketFromList(packet_list);
uint16_t sequence_number;
int64_t capture_time_ms;
bool retransmission;
GetNextPacketFromList(packet_list, &ssrc, &sequence_number,
&capture_time_ms, &retransmission);
critsect_->Leave(); critsect_->Leave();
const bool success = callback_->TimeToSendPacket(ssrc, sequence_number, const bool success = callback_->TimeToSendPacket(packet.ssrc_,
capture_time_ms, packet.sequence_number_,
retransmission); packet.capture_time_ms_,
packet.retransmission_);
critsect_->Enter(); critsect_->Enter();
// If packet cannot be sent then keep it in packet list and exit early. // If packet cannot be sent then keep it in packet list and exit early.
// There's no need to send more packets. // There's no need to send more packets.
@ -305,13 +306,14 @@ bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) {
} }
packet_list->pop_front(); packet_list->pop_front();
const bool last_packet = packet_list->empty() || const bool last_packet = packet_list->empty() ||
packet_list->front().capture_time_ms_ > capture_time_ms; packet_list->front().capture_time_ms_ > packet.capture_time_ms_;
if (packet_list != high_priority_packets_.get()) { if (packet_list != high_priority_packets_.get()) {
if (capture_time_ms > capture_time_ms_last_sent_) { if (packet.capture_time_ms_ > capture_time_ms_last_sent_) {
capture_time_ms_last_sent_ = capture_time_ms; capture_time_ms_last_sent_ = packet.capture_time_ms_;
} else if (capture_time_ms == capture_time_ms_last_sent_ && } else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ &&
last_packet) { last_packet) {
TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms); TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend",
packet.capture_time_ms_);
} }
} }
return true; return true;
@ -374,15 +376,11 @@ bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
return false; return false;
} }
void PacedSender::GetNextPacketFromList(paced_sender::PacketList* packets, paced_sender::Packet PacedSender::GetNextPacketFromList(
uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms, paced_sender::PacketList* packets) {
bool* retransmission) {
paced_sender::Packet packet = packets->front(); paced_sender::Packet packet = packets->front();
UpdateMediaBytesSent(packet.bytes_); UpdateMediaBytesSent(packet.bytes_);
*sequence_number = packet.sequence_number_; return packet;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
*retransmission = packet.retransmission_;
} }
// MUST have critsect_ when calling. // MUST have critsect_ when calling.

View File

@ -368,8 +368,6 @@ TEST_F(PacedSenderTest, Pause) {
uint32_t ssrc = 12346; uint32_t ssrc = 12346;
uint16_t sequence_number = 1234; uint16_t sequence_number = 1234;
int64_t capture_time_ms = TickTime::MillisecondTimestamp(); int64_t capture_time_ms = TickTime::MillisecondTimestamp();
TickTime::AdvanceFakeClock(10000);
int64_t second_capture_time_ms = TickTime::MillisecondTimestamp();
EXPECT_EQ(0, send_bucket_->QueueInMs()); EXPECT_EQ(0, send_bucket_->QueueInMs());
@ -384,10 +382,6 @@ TEST_F(PacedSenderTest, Pause) {
send_bucket_->Pause(); send_bucket_->Pause();
// Expect everything to be queued.
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kLowPriority,
ssrc_low_priority, sequence_number++, second_capture_time_ms, 250,
false));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250, false)); ssrc, sequence_number++, capture_time_ms, 250, false));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
@ -395,6 +389,14 @@ TEST_F(PacedSenderTest, Pause) {
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kHighPriority, EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kHighPriority,
ssrc, sequence_number++, capture_time_ms, 250, false)); ssrc, sequence_number++, capture_time_ms, 250, false));
TickTime::AdvanceFakeClock(10000);
int64_t second_capture_time_ms = TickTime::MillisecondTimestamp();
// Expect everything to be queued.
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kLowPriority,
ssrc_low_priority, sequence_number++, second_capture_time_ms, 250,
false));
EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms, EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms,
send_bucket_->QueueInMs()); send_bucket_->QueueInMs());
@ -441,13 +443,14 @@ TEST_F(PacedSenderTest, ResendPacket) {
capture_time_ms, capture_time_ms,
250, 250,
false)); false));
TickTime::AdvanceFakeClock(1);
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, ssrc,
sequence_number + 1, sequence_number + 1,
capture_time_ms + 1, capture_time_ms + 1,
250, 250,
false)); false));
TickTime::AdvanceFakeClock(10000); TickTime::AdvanceFakeClock(9999);
EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms, EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms,
send_bucket_->QueueInMs()); send_bucket_->QueueInMs());
// Fails to send first packet so only one call. // Fails to send first packet so only one call.
@ -516,5 +519,24 @@ TEST_F(PacedSenderTest, MaxQueueLength) {
TickTime::AdvanceFakeClock(31); TickTime::AdvanceFakeClock(31);
send_bucket_->Process(); send_bucket_->Process();
} }
TEST_F(PacedSenderTest, QueueTimeGrowsOverTime) {
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
EXPECT_EQ(0, send_bucket_->QueueInMs());
send_bucket_->UpdateBitrate(30, 0, 0);
SendAndExpectPacket(PacedSender::kNormalPriority,
ssrc,
sequence_number,
TickTime::MillisecondTimestamp(),
1200,
false);
TickTime::AdvanceFakeClock(500);
EXPECT_EQ(500, send_bucket_->QueueInMs());
send_bucket_->Process();
EXPECT_EQ(0, send_bucket_->QueueInMs());
}
} // namespace test } // namespace test
} // namespace webrtc } // namespace webrtc