diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h index d3034466b..d7efb8ea1 100644 --- a/webrtc/modules/pacing/include/paced_sender.h +++ b/webrtc/modules/pacing/include/paced_sender.h @@ -27,7 +27,7 @@ class CriticalSectionWrapper; namespace paced_sender { class IntervalBudget; struct Packet; -class PacketList; +class PacketQueue; } // namespace paced_sender class PacedSender : public Module { @@ -105,15 +105,15 @@ class PacedSender : public Module { int bytes, bool retransmission); - // Sets the max length of the pacer queue in milliseconds. - // A negative queue size is interpreted as infinite. - virtual void set_max_queue_length_ms(int max_queue_length_ms); - // Returns the time since the oldest queued packet was enqueued. virtual int QueueInMs() const; virtual size_t QueueSizePackets() const; + // Returns the number of milliseconds it will take to send the current + // packets in the queue, given the current size and bitrate, ignoring prio. + virtual int ExpectedQueueTimeMs() const; + // Returns the number of milliseconds until the module want a worker thread // to call Process. virtual int32_t TimeUntilNextProcess() OVERRIDE; @@ -125,24 +125,13 @@ class PacedSender : public Module { virtual bool ProbingExperimentIsEnabled() const; private: - // Return true if next packet in line should be transmitted. - // Return packet list that contains the next packet. - bool ShouldSendNextPacket(paced_sender::PacketList** packet_list, bool probe) - EXCLUSIVE_LOCKS_REQUIRED(critsect_); - - // Local helper function to GetNextPacket. - paced_sender::Packet GetNextPacketFromList(paced_sender::PacketList* packets) - EXCLUSIVE_LOCKS_REQUIRED(critsect_); - - bool SendPacketFromList(paced_sender::PacketList* packet_list) - EXCLUSIVE_LOCKS_REQUIRED(critsect_); - // Updates the number of bytes that can be sent for the next time interval. void UpdateBytesPerInterval(uint32_t delta_time_in_ms) EXCLUSIVE_LOCKS_REQUIRED(critsect_); - // Updates the buffers with the number of bytes that we sent. - void UpdateMediaBytesSent(int num_bytes) EXCLUSIVE_LOCKS_REQUIRED(critsect_); + bool SendPacket(const paced_sender::Packet& packet) + EXCLUSIVE_LOCKS_REQUIRED(critsect_); + void SendPadding(int padding_needed) EXCLUSIVE_LOCKS_REQUIRED(critsect_); Clock* const clock_; Callback* const callback_; @@ -150,7 +139,6 @@ class PacedSender : public Module { scoped_ptr critsect_; bool enabled_ GUARDED_BY(critsect_); bool paused_ GUARDED_BY(critsect_); - int max_queue_length_ms_ GUARDED_BY(critsect_); // This is the media budget, keeping track of how many bits of media // we can pace out during the current interval. scoped_ptr media_budget_ GUARDED_BY(critsect_); @@ -164,17 +152,9 @@ class PacedSender : public Module { int bitrate_bps_ GUARDED_BY(critsect_); int64_t time_last_update_us_ GUARDED_BY(critsect_); - // Only accessed via process thread. - int64_t time_last_media_send_us_; - int64_t capture_time_ms_last_queued_ GUARDED_BY(critsect_); - int64_t capture_time_ms_last_sent_ GUARDED_BY(critsect_); - scoped_ptr high_priority_packets_ - GUARDED_BY(critsect_); - scoped_ptr normal_priority_packets_ - GUARDED_BY(critsect_); - scoped_ptr low_priority_packets_ - GUARDED_BY(critsect_); + scoped_ptr packets_ GUARDED_BY(critsect_); + uint64_t packet_counter_ GUARDED_BY(critsect_); }; } // namespace webrtc #endif // WEBRTC_MODULES_PACING_INCLUDE_PACED_SENDER_H_ diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc index 64b3eb1e3..a071ffcc8 100644 --- a/webrtc/modules/pacing/paced_sender.cc +++ b/webrtc/modules/pacing/paced_sender.cc @@ -13,6 +13,7 @@ #include #include +#include #include #include "webrtc/modules/interface/module_common_types.h" @@ -31,80 +32,140 @@ const int kMinPacketLimitMs = 5; // time. const int kMaxIntervalTimeMs = 30; -// Max time that the first packet in the queue can sit in the queue if no -// packets are sent, regardless of buffer state. In practice only in effect at -// low bitrates (less than 320 kbits/s). -const int kMaxQueueTimeWithoutSendingUs = 30000; - } // namespace namespace webrtc { namespace paced_sender { struct Packet { - Packet(uint32_t ssrc, + Packet(PacedSender::Priority priority, + uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms, int64_t enqueue_time_ms, int length_in_bytes, - bool retransmission) - : ssrc(ssrc), + bool retransmission, + uint64_t enqueue_order) + : priority(priority), + ssrc(ssrc), sequence_number(seq_number), capture_time_ms(capture_time_ms), enqueue_time_ms(enqueue_time_ms), bytes(length_in_bytes), - retransmission(retransmission) {} + retransmission(retransmission), + enqueue_order(enqueue_order) {} + + PacedSender::Priority priority; uint32_t ssrc; uint16_t sequence_number; int64_t capture_time_ms; int64_t enqueue_time_ms; int bytes; bool retransmission; + uint64_t enqueue_order; + std::list::iterator this_it; }; -// STL list style class which prevents duplicates in the list. -class PacketList { +// Used by priority queue to sort packets. +struct Comparator { + bool operator()(const Packet* first, const Packet* second) { + // Highest prio = 0. + if (first->priority != second->priority) + return first->priority > second->priority; + + // Retransmissions go first. + if (second->retransmission && !first->retransmission) + return true; + + // Older frames have higher prio. + if (first->capture_time_ms != second->capture_time_ms) + return first->capture_time_ms > second->capture_time_ms; + + return first->enqueue_order > second->enqueue_order; + } +}; + +// Class encapsulating a priority queue with some extensions. +class PacketQueue { public: - PacketList() {}; + PacketQueue() : bytes_(0) {} + virtual ~PacketQueue() {} - bool empty() const { - return packet_list_.empty(); + void Push(const Packet& packet) { + if (!AddToDupeSet(packet)) + return; + + // Store packet in list, use pointers in priority queue for cheaper moves. + // Packets have a handle to its own iterator in the list, for easy removal + // when popping from queue. + packet_list_.push_front(packet); + std::list::iterator it = packet_list_.begin(); + it->this_it = it; // Handle for direct removal from list. + prio_queue_.push(&(*it)); // Pointer into list. + bytes_ += packet.bytes; } - Packet front() const { - return packet_list_.front(); + const Packet& BeginPop() { + const Packet& packet = *prio_queue_.top(); + prio_queue_.pop(); + return packet; } - size_t size() const { - size_t sum = 0; - for (std::map >::const_iterator it = - sequence_number_set_.begin(); - it != sequence_number_set_.end(); - ++it) { - sum += it->second.size(); - } - return sum; + void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); } + + void FinalizePop(const Packet& packet) { + RemoveFromDupeSet(packet); + bytes_ -= packet.bytes; + packet_list_.erase(packet.this_it); } - void pop_front() { - Packet& packet = packet_list_.front(); - uint16_t sequence_number = packet.sequence_number; - uint32_t ssrc = packet.ssrc; - packet_list_.pop_front(); - sequence_number_set_[ssrc].erase(sequence_number); - } + bool Empty() const { return prio_queue_.empty(); } - void push_back(const Packet& packet) { - if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) == - sequence_number_set_[packet.ssrc].end()) { - // Don't insert duplicates. - packet_list_.push_back(packet); - sequence_number_set_[packet.ssrc].insert(packet.sequence_number); - } + size_t SizeInPackets() const { return prio_queue_.size(); } + + uint32_t SizeInBytes() const { return bytes_; } + + int64_t OldestEnqueueTime() const { + std::list::const_reverse_iterator it = packet_list_.rbegin(); + if (it == packet_list_.rend()) + return 0; + return it->enqueue_time_ms; } private: + // Try to add a packet to the set of ssrc/seqno identifiers currently in the + // queue. Return true if inserted, false if this is a duplicate. + bool AddToDupeSet(const Packet& packet) { + SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); + if (it == dupe_map_.end()) { + // First for this ssrc, just insert. + dupe_map_[packet.ssrc].insert(packet.sequence_number); + return true; + } + + // Insert returns a pair, where second is a bool set to true if new element. + return it->second.insert(packet.sequence_number).second; + } + + void RemoveFromDupeSet(const Packet& packet) { + SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); + assert(it != dupe_map_.end()); + it->second.erase(packet.sequence_number); + if (it->second.empty()) { + dupe_map_.erase(it); + } + } + + // List of packets, in the order the were enqueued. Since dequeueing may + // occur out of order, use list instead of vector. std::list packet_list_; - std::map > sequence_number_set_; + // Priority queue of the packets, sorted according to Comparator. + // Use pointers into list, to avoid moving whole struct within heap. + std::priority_queue, Comparator> prio_queue_; + // Total number of bytes in the queue. + uint64_t bytes_; + // Map >, for checking duplicates. + typedef std::map > SsrcSeqNoMap; + SsrcSeqNoMap dupe_map_; }; class IntervalBudget { @@ -135,6 +196,8 @@ class IntervalBudget { int bytes_remaining() const { return bytes_remaining_; } + int target_rate_kbps() const { return target_rate_kbps_; } + private: int target_rate_kbps_; int bytes_remaining_; @@ -153,18 +216,13 @@ PacedSender::PacedSender(Clock* clock, critsect_(CriticalSectionWrapper::CreateCriticalSection()), enabled_(true), paused_(false), - max_queue_length_ms_(kDefaultMaxQueueLengthMs), media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)), padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)), prober_(new BitrateProber()), bitrate_bps_(1000 * bitrate_kbps), time_last_update_us_(clock->TimeInMicroseconds()), - time_last_media_send_us_(-1), - capture_time_ms_last_queued_(0), - capture_time_ms_last_sent_(0), - high_priority_packets_(new paced_sender::PacketList), - normal_priority_packets_(new paced_sender::PacketList), - low_priority_packets_(new paced_sender::PacketList) { + packets_(new paced_sender::PacketQueue()), + packet_counter_(0) { UpdateBytesPerInterval(kMinPacketLimitMs); } @@ -216,64 +274,33 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc, if (capture_time_ms < 0) { capture_time_ms = clock_->TimeInMilliseconds(); } - if (priority != kHighPriority && - capture_time_ms > capture_time_ms_last_queued_) { - capture_time_ms_last_queued_ = capture_time_ms; - TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms, - "capture_time_ms", capture_time_ms); - } - paced_sender::PacketList* packet_list = NULL; - switch (priority) { - case kHighPriority: - packet_list = high_priority_packets_.get(); - break; - case kNormalPriority: - packet_list = normal_priority_packets_.get(); - break; - case kLowPriority: - packet_list = low_priority_packets_.get(); - break; - } - packet_list->push_back(paced_sender::Packet(ssrc, - sequence_number, - capture_time_ms, - clock_->TimeInMilliseconds(), - bytes, - retransmission)); + + packets_->Push(paced_sender::Packet( + priority, ssrc, sequence_number, capture_time_ms, + clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++)); return false; } -void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) { +int PacedSender::ExpectedQueueTimeMs() const { CriticalSectionScoped cs(critsect_.get()); - max_queue_length_ms_ = max_queue_length_ms; -} - -int PacedSender::QueueInMs() const { - CriticalSectionScoped cs(critsect_.get()); - int64_t now_ms = clock_->TimeInMilliseconds(); - int64_t oldest_packet_enqueue_time = now_ms; - if (!high_priority_packets_->empty()) { - oldest_packet_enqueue_time = - std::min(oldest_packet_enqueue_time, - high_priority_packets_->front().enqueue_time_ms); - } - if (!normal_priority_packets_->empty()) { - oldest_packet_enqueue_time = - std::min(oldest_packet_enqueue_time, - normal_priority_packets_->front().enqueue_time_ms); - } - if (!low_priority_packets_->empty()) { - oldest_packet_enqueue_time = - std::min(oldest_packet_enqueue_time, - low_priority_packets_->front().enqueue_time_ms); - } - return now_ms - oldest_packet_enqueue_time; + int target_rate = media_budget_->target_rate_kbps(); + assert(target_rate > 0); + return packets_->SizeInBytes() * 8 / target_rate; } size_t PacedSender::QueueSizePackets() const { CriticalSectionScoped cs(critsect_.get()); - return low_priority_packets_->size() + normal_priority_packets_->size() + - high_priority_packets_->size(); + return packets_->SizeInPackets(); +} + +int PacedSender::QueueInMs() const { + CriticalSectionScoped cs(critsect_.get()); + + int64_t oldest_packet = packets_->OldestEnqueueTime(); + if (oldest_packet == 0) + return 0; + + return clock_->TimeInMilliseconds() - oldest_packet; } int32_t PacedSender::TimeUntilNextProcess() { @@ -303,57 +330,61 @@ int32_t PacedSender::Process() { uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); UpdateBytesPerInterval(delta_time_ms); } - paced_sender::PacketList* packet_list; - while (ShouldSendNextPacket(&packet_list, prober_->IsProbing())) { - if (!SendPacketFromList(packet_list)) + + while (!packets_->Empty()) { + if (media_budget_->bytes_remaining() <= 0 && !prober_->IsProbing()) return 0; - // Send one packet per Process() call when probing, so that we have - // better control over the delta between packets. - if (prober_->IsProbing()) + + // Since we need to release the lock in order to send, we first pop the + // element from the priority queue but keep it in storage, so that we can + // reinsert it if send fails. + const paced_sender::Packet& packet = packets_->BeginPop(); + if (SendPacket(packet)) { + // Send succeeded, remove it from the queue. + packets_->FinalizePop(packet); + if (prober_->IsProbing()) + return 0; + } else { + // Send failed, put it back into the queue. + packets_->CancelPop(packet); return 0; + } } - if (high_priority_packets_->empty() && normal_priority_packets_->empty() && - low_priority_packets_->empty() && - padding_budget_->bytes_remaining() > 0) { - int padding_needed = padding_budget_->bytes_remaining(); - critsect_->Leave(); - int bytes_sent = callback_->TimeToSendPadding(padding_needed); - critsect_->Enter(); - media_budget_->UseBudget(bytes_sent); - padding_budget_->UseBudget(bytes_sent); + + int padding_needed = padding_budget_->bytes_remaining(); + if (padding_needed > 0) { + SendPadding(padding_needed); } } return 0; } -bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) - EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) { - paced_sender::Packet packet = GetNextPacketFromList(packet_list); +bool PacedSender::SendPacket(const paced_sender::Packet& packet) { critsect_->Leave(); - const bool success = callback_->TimeToSendPacket(packet.ssrc, packet.sequence_number, packet.capture_time_ms, packet.retransmission); critsect_->Enter(); - // If packet cannot be sent then keep it in packet list and exit early. - // There's no need to send more packets. - if (!success) { - return false; + + if (success) { + // Update media bytes sent. + prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes); + media_budget_->UseBudget(packet.bytes); + padding_budget_->UseBudget(packet.bytes); } - packet_list->pop_front(); - const bool last_packet = - packet_list->empty() || - packet_list->front().capture_time_ms > packet.capture_time_ms; - if (packet_list != high_priority_packets_.get()) { - if (packet.capture_time_ms > capture_time_ms_last_sent_) { - capture_time_ms_last_sent_ = packet.capture_time_ms; - } else if (packet.capture_time_ms == capture_time_ms_last_sent_ && - last_packet) { - TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms); - } - } - return true; + + return success; +} + +void PacedSender::SendPadding(int padding_needed) { + critsect_->Leave(); + int bytes_sent = callback_->TimeToSendPadding(padding_needed); + critsect_->Enter(); + + // Update padding bytes sent. + media_budget_->UseBudget(bytes_sent); + padding_budget_->UseBudget(bytes_sent); } void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) { @@ -361,71 +392,6 @@ void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) { padding_budget_->IncreaseBudget(delta_time_ms); } -bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list, - bool probe) { - *packet_list = NULL; - if (!probe && media_budget_->bytes_remaining() <= 0) { - // All bytes consumed for this interval. - // Check if we have not sent in a too long time. - if (clock_->TimeInMicroseconds() - time_last_media_send_us_ > - kMaxQueueTimeWithoutSendingUs) { - if (!high_priority_packets_->empty()) { - *packet_list = high_priority_packets_.get(); - return true; - } - if (!normal_priority_packets_->empty()) { - *packet_list = normal_priority_packets_.get(); - return true; - } - } - // Send any old packets to avoid queuing for too long. - if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) { - int64_t high_priority_capture_time = -1; - if (!high_priority_packets_->empty()) { - high_priority_capture_time = - high_priority_packets_->front().capture_time_ms; - *packet_list = high_priority_packets_.get(); - } - if (!normal_priority_packets_->empty() && - (high_priority_capture_time == -1 || - high_priority_capture_time > - normal_priority_packets_->front().capture_time_ms)) { - *packet_list = normal_priority_packets_.get(); - } - if (*packet_list) - return true; - } - return false; - } - if (!high_priority_packets_->empty()) { - *packet_list = high_priority_packets_.get(); - return true; - } - if (!normal_priority_packets_->empty()) { - *packet_list = normal_priority_packets_.get(); - return true; - } - if (!low_priority_packets_->empty()) { - *packet_list = low_priority_packets_.get(); - return true; - } - return false; -} - -paced_sender::Packet PacedSender::GetNextPacketFromList( - paced_sender::PacketList* packets) { - paced_sender::Packet packet = packets->front(); - UpdateMediaBytesSent(packet.bytes); - return packet; -} - -void PacedSender::UpdateMediaBytesSent(int num_bytes) { - prober_->PacketSent(clock_->TimeInMilliseconds(), num_bytes); - time_last_media_send_us_ = clock_->TimeInMicroseconds(); - media_budget_->UseBudget(num_bytes); - padding_budget_->UseBudget(num_bytes); -} - bool PacedSender::ProbingExperimentIsEnabled() const { return webrtc::field_trial::FindFullName("WebRTC-BitrateProbing") == "Enabled"; diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc index f8028a919..34787d16b 100644 --- a/webrtc/modules/pacing/paced_sender_unittest.cc +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -639,34 +639,40 @@ TEST_F(PacedSenderTest, ResendPacket) { EXPECT_EQ(0, send_bucket_->QueueInMs()); } -TEST_F(PacedSenderTest, MaxQueueLength) { +TEST_F(PacedSenderTest, ExpectedQueueTimeMs) { uint32_t ssrc = 12346; uint16_t sequence_number = 1234; - EXPECT_EQ(0, send_bucket_->QueueInMs()); + const int32_t kNumPackets = 60; + const int32_t kPacketSize = 1200; + const int32_t kMaxBitrate = kPaceMultiplier * 30; + EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs()); - send_bucket_->UpdateBitrate(30, kPaceMultiplier * 30, 0); - for (int i = 0; i < 30; ++i) { - SendAndExpectPacket(PacedSender::kNormalPriority, - ssrc, - sequence_number++, - clock_.TimeInMilliseconds(), - 1200, - false); + send_bucket_->UpdateBitrate(30, kMaxBitrate, 0); + for (int i = 0; i < kNumPackets; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); } - clock_.AdvanceTimeMilliseconds(2001); - SendAndExpectPacket(PacedSender::kNormalPriority, - ssrc, - sequence_number++, - clock_.TimeInMilliseconds(), - 1200, - false); - EXPECT_EQ(2001, send_bucket_->QueueInMs()); - send_bucket_->Process(); - EXPECT_EQ(0, send_bucket_->QueueInMs()); - clock_.AdvanceTimeMilliseconds(31); + // Queue in ms = 1000 * (bytes in queue) / (kbit per second * 1000 / 8) + int32_t queue_in_ms = kNumPackets * kPacketSize * 8 / kMaxBitrate; + EXPECT_EQ(queue_in_ms, send_bucket_->ExpectedQueueTimeMs()); - send_bucket_->Process(); + int64_t time_start = clock_.TimeInMilliseconds(); + while (send_bucket_->QueueSizePackets() > 0) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + if (time_until_process <= 0) { + send_bucket_->Process(); + } else { + clock_.AdvanceTimeMilliseconds(time_until_process); + } + } + int64_t duration = clock_.TimeInMilliseconds() - time_start; + + EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs()); + + // Allow for aliasing, duration should be in [expected(n - 1), expected(n)]. + EXPECT_LE(duration, queue_in_ms); + EXPECT_GE(duration, queue_in_ms - (kPacketSize * 8 / kMaxBitrate)); } TEST_F(PacedSenderTest, QueueTimeGrowsOverTime) { @@ -738,5 +744,79 @@ TEST_F(PacedSenderTest, ProbingWithInitialFrame) { } } } + +TEST_F(PacedSenderTest, PriorityInversion) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const int32_t kPacketSize = 1200; + + EXPECT_FALSE(send_bucket_->SendPacket( + PacedSender::kHighPriority, ssrc, sequence_number + 3, + clock_.TimeInMilliseconds() + 33, kPacketSize, true)); + + EXPECT_FALSE(send_bucket_->SendPacket( + PacedSender::kHighPriority, ssrc, sequence_number + 2, + clock_.TimeInMilliseconds() + 33, kPacketSize, true)); + + EXPECT_FALSE(send_bucket_->SendPacket( + PacedSender::kHighPriority, ssrc, sequence_number, + clock_.TimeInMilliseconds(), kPacketSize, true)); + + EXPECT_FALSE(send_bucket_->SendPacket( + PacedSender::kHighPriority, ssrc, sequence_number + 1, + clock_.TimeInMilliseconds(), kPacketSize, true)); + + // Packets from earlier frames should be sent first. + { + ::testing::InSequence sequence; + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number, + clock_.TimeInMilliseconds(), true)) + .WillOnce(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1, + clock_.TimeInMilliseconds(), true)) + .WillOnce(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 3, + clock_.TimeInMilliseconds() + 33, + true)).WillOnce(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 2, + clock_.TimeInMilliseconds() + 33, + true)).WillOnce(Return(true)); + + while (send_bucket_->QueueSizePackets() > 0) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + if (time_until_process <= 0) { + send_bucket_->Process(); + } else { + clock_.AdvanceTimeMilliseconds(time_until_process); + } + } + } +} + +TEST_F(PacedSenderTest, PaddingOveruse) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const int32_t kPacketSize = 1200; + + // Min bitrate 0 => no padding, padding budget will stay at 0. + send_bucket_->UpdateBitrate(60, 90, 0); + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + send_bucket_->Process(); + + // Add 30kbit padding. When increasing budget, media budget will increase from + // negative (overuse) while padding budget will increase form 0. + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->UpdateBitrate(60, 90, 30); + + EXPECT_FALSE(send_bucket_->SendPacket( + PacedSender::kHighPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false)); + + // Don't send padding if queue is non-empty, even if padding budget > 0. + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + send_bucket_->Process(); +} + } // namespace test } // namespace webrtc diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc index d54126689..0438b9f7d 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc @@ -50,12 +50,17 @@ RTPSender::RTPSender(const int32_t id, FrameCountObserver* frame_count_observer, SendSideDelayObserver* send_side_delay_observer) : clock_(clock), + // TODO(holmer): Remove this conversion when we remove the use of + // TickTime. + clock_delta_ms_(clock_->TimeInMilliseconds() - + TickTime::MillisecondTimestamp()), bitrate_sent_(clock, this), id_(id), audio_configured_(audio), audio_(NULL), video_(NULL), paced_sender_(paced_sender), + last_capture_time_ms_sent_(0), send_critsect_(CriticalSectionWrapper::CreateCriticalSection()), transport_(transport), sending_media_(true), // Default to sending media. @@ -622,15 +627,10 @@ int32_t RTPSender::ReSendPacket(uint16_t packet_id, uint32_t min_resend_time) { } // Convert from TickTime to Clock since capture_time_ms is based on // TickTime. - // TODO(holmer): Remove this conversion when we remove the use of TickTime. - int64_t clock_delta_ms = clock_->TimeInMilliseconds() - - TickTime::MillisecondTimestamp(); - if (!paced_sender_->SendPacket(PacedSender::kHighPriority, - header.ssrc, - header.sequenceNumber, - capture_time_ms + clock_delta_ms, - length - header.headerLength, - true)) { + int64_t corrected_capture_tims_ms = capture_time_ms + clock_delta_ms_; + if (!paced_sender_->SendPacket( + PacedSender::kHighPriority, header.ssrc, header.sequenceNumber, + corrected_capture_tims_ms, length - header.headerLength, true)) { // We can't send the packet right now. // We will be called when it is time. return length; @@ -819,6 +819,10 @@ bool RTPSender::PrepareAndSendPacket(uint8_t* buffer, RtpUtility::RtpHeaderParser rtp_parser(buffer, length); RTPHeader rtp_header; rtp_parser.Parse(rtp_header); + if (!is_retransmit && rtp_header.markerBit) { + TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms); + } + TRACE_EVENT_INSTANT2("webrtc_rtp", "PrepareAndSendPacket", "timestamp", rtp_header.timestamp, "seqnum", rtp_header.sequenceNumber); @@ -937,12 +941,18 @@ int32_t RTPSender::SendToNetwork( } if (paced_sender_ && storage != kDontStore) { - int64_t clock_delta_ms = clock_->TimeInMilliseconds() - - TickTime::MillisecondTimestamp(); + // Correct offset between implementations of millisecond time stamps in + // TickTime and Clock. + int64_t corrected_time_ms = capture_time_ms + clock_delta_ms_; if (!paced_sender_->SendPacket(priority, rtp_header.ssrc, - rtp_header.sequenceNumber, - capture_time_ms + clock_delta_ms, + rtp_header.sequenceNumber, corrected_time_ms, payload_length, false)) { + if (last_capture_time_ms_sent_ == 0 || + corrected_time_ms > last_capture_time_ms_sent_) { + last_capture_time_ms_sent_ = corrected_time_ms; + TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", corrected_time_ms, + "capture_time_ms", corrected_time_ms); + } // We can't send the packet right now. // We will be called when it is time. return 0; diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.h b/webrtc/modules/rtp_rtcp/source/rtp_sender.h index b2f2e0c42..780baa1fd 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender.h +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.h @@ -336,6 +336,7 @@ class RTPSender : public RTPSenderInterface, public Bitrate::Observer { bool IsFecPacket(const uint8_t* buffer, const RTPHeader& header) const; Clock* clock_; + int64_t clock_delta_ms_; Bitrate bitrate_sent_; int32_t id_; @@ -344,6 +345,7 @@ class RTPSender : public RTPSenderInterface, public Bitrate::Observer { RTPSenderVideo *video_; PacedSender *paced_sender_; + int64_t last_capture_time_ms_sent_; CriticalSectionWrapper *send_critsect_; Transport *transport_; diff --git a/webrtc/video_engine/vie_encoder.cc b/webrtc/video_engine/vie_encoder.cc index 9d6da977e..3cb0ae704 100644 --- a/webrtc/video_engine/vie_encoder.cc +++ b/webrtc/video_engine/vie_encoder.cc @@ -470,9 +470,31 @@ bool ViEEncoder::EncoderPaused() const { std::max(static_cast(target_delay_ms_ * kEncoderPausePacerMargin), kMinPacingDelayMs); } + if (paced_sender_->ExpectedQueueTimeMs() > + PacedSender::kDefaultMaxQueueLengthMs) { + // Too much data in pacer queue, drop frame. + return true; + } return !network_is_transmitting_; } +void ViEEncoder::TraceFrameDropStart() { + // Start trace event only on the first frame after encoder is paused. + if (!encoder_paused_and_dropped_frame_) { + TRACE_EVENT_ASYNC_BEGIN0("webrtc", "EncoderPaused", this); + } + encoder_paused_and_dropped_frame_ = true; + return; +} + +void ViEEncoder::TraceFrameDropEnd() { + // End trace event on first frame after encoder resumes, if frame was dropped. + if (encoder_paused_and_dropped_frame_) { + TRACE_EVENT_ASYNC_END0("webrtc", "EncoderPaused", this); + } + encoder_paused_and_dropped_frame_ = false; +} + RtpRtcp* ViEEncoder::SendRtpRtcpModule() { return default_rtp_rtcp_.get(); } @@ -489,16 +511,10 @@ void ViEEncoder::DeliverFrame(int id, CriticalSectionScoped cs(data_cs_.get()); time_of_last_incoming_frame_ms_ = TickTime::MillisecondTimestamp(); if (EncoderPaused()) { - if (!encoder_paused_and_dropped_frame_) { - TRACE_EVENT_ASYNC_BEGIN0("webrtc", "EncoderPaused", this); - } - encoder_paused_and_dropped_frame_ = true; + TraceFrameDropStart(); return; } - if (encoder_paused_and_dropped_frame_) { - TRACE_EVENT_ASYNC_END0("webrtc", "EncoderPaused", this); - } - encoder_paused_and_dropped_frame_ = false; + TraceFrameDropEnd(); } // Convert render time, in ms, to RTP timestamp. @@ -702,15 +718,10 @@ void ViEEncoder::SetSenderBufferingMode(int target_delay_ms) { // Disable external frame-droppers. vcm_.EnableFrameDropper(false); vpm_.EnableTemporalDecimation(false); - // We don't put any limits on the pacer queue when running in buffered mode - // since the encoder will be paused if the queue grow too large. - paced_sender_->set_max_queue_length_ms(-1); } else { // Real-time mode - enable frame droppers. vpm_.EnableTemporalDecimation(true); vcm_.EnableFrameDropper(true); - paced_sender_->set_max_queue_length_ms( - PacedSender::kDefaultMaxQueueLengthMs); } } diff --git a/webrtc/video_engine/vie_encoder.h b/webrtc/video_engine/vie_encoder.h index 36f87faad..1e358def1 100644 --- a/webrtc/video_engine/vie_encoder.h +++ b/webrtc/video_engine/vie_encoder.h @@ -192,6 +192,8 @@ class ViEEncoder int TimeToSendPadding(int bytes); private: bool EncoderPaused() const EXCLUSIVE_LOCKS_REQUIRED(data_cs_); + void TraceFrameDropStart() EXCLUSIVE_LOCKS_REQUIRED(data_cs_); + void TraceFrameDropEnd() EXCLUSIVE_LOCKS_REQUIRED(data_cs_); void UpdateHistograms();