diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h index 0eeb9921a..dc189154a 100644 --- a/webrtc/modules/pacing/include/paced_sender.h +++ b/webrtc/modules/pacing/include/paced_sender.h @@ -42,7 +42,8 @@ class PacedSender : public Module { // Note: packets sent as a result of a callback should not pass by this // module again. // Called when it's time to send a queued packet. - virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, + // Returns false if packet cannot be sent. + virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms) = 0; // Called when it's a good time to send a padding data. virtual int TimeToSendPadding(int bytes) = 0; @@ -90,15 +91,13 @@ class PacedSender : public Module { virtual int32_t Process(); private: - // Checks if next packet in line can be transmitted. Returns true on success. - bool GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number, - int64_t* capture_time_ms, Priority* priority, - bool* last_packet); + // Return true if next packet in line should be transmitted. + // Return packet list that contains the next packet. + bool ShouldSendNextPacket(paced_sender::PacketList** packet_list); // Local helper function to GetNextPacket. void GetNextPacketFromList(paced_sender::PacketList* packets, - uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms, - bool* last_packet); + uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms); // Updates the number of bytes that can be sent for the next time interval. void UpdateBytesPerInterval(uint32_t delta_time_in_ms); diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc index 3e5027513..2fdc5cfa4 100644 --- a/webrtc/modules/pacing/paced_sender.cc +++ b/webrtc/modules/pacing/paced_sender.cc @@ -181,7 +181,7 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc, if (capture_time_ms < 0) { capture_time_ms = TickTime::MillisecondTimestamp(); } - if (paused_ && priority == kNormalPriority && + if (priority != kHighPriority && capture_time_ms > capture_time_ms_last_queued_) { capture_time_ms_last_queued_ = capture_time_ms; TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms, @@ -252,11 +252,25 @@ int32_t PacedSender::Process() { uint32_t ssrc; uint16_t sequence_number; int64_t capture_time_ms; - Priority priority; - bool last_packet; - while (GetNextPacket(&ssrc, &sequence_number, &capture_time_ms, - &priority, &last_packet)) { - if (priority == kNormalPriority) { + paced_sender::PacketList* packet_list; + while (ShouldSendNextPacket(&packet_list)) { + GetNextPacketFromList(packet_list, &ssrc, &sequence_number, + &capture_time_ms); + critsect_->Leave(); + + const bool success = callback_->TimeToSendPacket(ssrc, sequence_number, + capture_time_ms); + // If packet cannt be sent then keep it in packet list and exit early. + // There's no need to send more packets. + if (!success) { + return 0; + } + + critsect_->Enter(); + packet_list->pop_front(); + const bool last_packet = packet_list->empty() || + packet_list->front().capture_time_ms_ > capture_time_ms; + if (packet_list != high_priority_packets_.get()) { if (capture_time_ms > capture_time_ms_last_sent_) { capture_time_ms_last_sent_ = capture_time_ms; } else if (capture_time_ms == capture_time_ms_last_sent_ && @@ -264,9 +278,6 @@ int32_t PacedSender::Process() { TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms); } } - critsect_->Leave(); - callback_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms); - critsect_->Enter(); } if (high_priority_packets_->empty() && normal_priority_packets_->empty() && @@ -295,61 +306,45 @@ void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) { } // MUST have critsect_ when calling. -bool PacedSender::GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number, - int64_t* capture_time_ms, Priority* priority, - bool* last_packet) { +bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) { if (media_budget_->bytes_remaining() <= 0) { // All bytes consumed for this interval. // Check if we have not sent in a too long time. if ((TickTime::Now() - time_last_send_).Milliseconds() > kMaxQueueTimeWithoutSendingMs) { if (!high_priority_packets_->empty()) { - *priority = kHighPriority; - GetNextPacketFromList(high_priority_packets_.get(), ssrc, - sequence_number, capture_time_ms, last_packet); + *packet_list = high_priority_packets_.get(); return true; } if (!normal_priority_packets_->empty()) { - *priority = kNormalPriority; - GetNextPacketFromList(normal_priority_packets_.get(), ssrc, - sequence_number, capture_time_ms, last_packet); + *packet_list = normal_priority_packets_.get(); return true; } } return false; } if (!high_priority_packets_->empty()) { - *priority = kHighPriority; - GetNextPacketFromList(high_priority_packets_.get(), ssrc, sequence_number, - capture_time_ms, last_packet); + *packet_list = high_priority_packets_.get(); return true; } if (!normal_priority_packets_->empty()) { - *priority = kNormalPriority; - GetNextPacketFromList(normal_priority_packets_.get(), ssrc, - sequence_number, capture_time_ms, last_packet); + *packet_list = normal_priority_packets_.get(); return true; } if (!low_priority_packets_->empty()) { - *priority = kLowPriority; - GetNextPacketFromList(low_priority_packets_.get(), ssrc, sequence_number, - capture_time_ms, last_packet); + *packet_list = low_priority_packets_.get(); return true; } return false; } void PacedSender::GetNextPacketFromList(paced_sender::PacketList* packets, - uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms, - bool* last_packet) { + uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms) { paced_sender::Packet packet = packets->front(); UpdateMediaBytesSent(packet.bytes_); *sequence_number = packet.sequence_number_; *ssrc = packet.ssrc_; *capture_time_ms = packet.capture_time_ms_; - packets->pop_front(); - *last_packet = packets->empty() || - packets->front().capture_time_ms_ > *capture_time_ms; } // MUST have critsect_ when calling. diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc index c8dcd9734..94a5c0bb1 100644 --- a/webrtc/modules/pacing/paced_sender_unittest.cc +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -25,7 +25,7 @@ static const float kPaceMultiplier = 1.5f; class MockPacedSenderCallback : public PacedSender::Callback { public: MOCK_METHOD3(TimeToSendPacket, - void(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms)); + bool(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms)); MOCK_METHOD1(TimeToSendPadding, int(int bytes)); }; @@ -34,8 +34,9 @@ class PacedSenderPadding : public PacedSender::Callback { public: PacedSenderPadding() : padding_sent_(0) {} - void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, + bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms) { + return true; } int TimeToSendPadding(int bytes) { @@ -68,7 +69,9 @@ class PacedSenderTest : public ::testing::Test { EXPECT_FALSE(send_bucket_->SendPacket(priority, ssrc, sequence_number, capture_time_ms, size)); EXPECT_CALL(callback_, TimeToSendPacket( - ssrc, sequence_number, capture_time_ms)).Times(1); + ssrc, sequence_number, capture_time_ms)) + .Times(1) + .WillRepeatedly(Return(true)); } MockPacedSenderCallback callback_; @@ -98,7 +101,9 @@ TEST_F(PacedSenderTest, QueuePacket) { TickTime::AdvanceFakeClock(1); EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_CALL(callback_, TimeToSendPacket( - ssrc, sequence_number++, capture_time_ms)).Times(1); + ssrc, sequence_number++, capture_time_ms)) + .Times(1) + .WillRepeatedly(Return(true)); send_bucket_->Process(); sequence_number++; SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, @@ -130,7 +135,9 @@ TEST_F(PacedSenderTest, PaceQueuedPackets) { EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); TickTime::AdvanceFakeClock(5); EXPECT_CALL(callback_, - TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3); + TimeToSendPacket(ssrc, _, capture_time_ms)) + .Times(3) + .WillRepeatedly(Return(true)); EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_EQ(0, send_bucket_->Process()); } @@ -177,7 +184,9 @@ TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) { for (int i = 0; i < 3; ++i) { EXPECT_CALL(callback_, TimeToSendPacket(ssrc, queued_sequence_number++, - capture_time_ms)).Times(1); + capture_time_ms)) + .Times(1) + .WillRepeatedly(Return(true)); } EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_EQ(0, send_bucket_->Process()); @@ -317,7 +326,9 @@ TEST_F(PacedSenderTest, Priority) { // Expect all high and normal priority to be sent out first. EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); - EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms)) + .Times(3) + .WillRepeatedly(Return(true)); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); TickTime::AdvanceFakeClock(5); @@ -325,7 +336,9 @@ TEST_F(PacedSenderTest, Priority) { EXPECT_EQ(0, send_bucket_->Process()); EXPECT_CALL(callback_, TimeToSendPacket( - ssrc_low_priority, _, capture_time_ms_low_priority)).Times(1); + ssrc_low_priority, _, capture_time_ms_low_priority)) + .Times(1) + .WillRepeatedly(Return(true)); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); TickTime::AdvanceFakeClock(5); @@ -378,8 +391,9 @@ TEST_F(PacedSenderTest, Pause) { } // Expect high prio packets to come out first followed by all packets in the // way they were added. - EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms)).Times(3); - + EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms)) + .Times(3) + .WillRepeatedly(Return(true)); send_bucket_->Resume(); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); @@ -387,9 +401,9 @@ TEST_F(PacedSenderTest, Pause) { EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_EQ(0, send_bucket_->Process()); - EXPECT_CALL(callback_, - TimeToSendPacket(_, _, second_capture_time_ms)).Times(1); - + EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms)) + .Times(1) + .WillRepeatedly(Return(true)); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); TickTime::AdvanceFakeClock(5); EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); @@ -397,5 +411,62 @@ TEST_F(PacedSenderTest, Pause) { EXPECT_EQ(0, send_bucket_->QueueInMs()); } +TEST_F(PacedSenderTest, ResendPacket) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = TickTime::MillisecondTimestamp(); + EXPECT_EQ(0, send_bucket_->QueueInMs()); + + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number, + capture_time_ms, + 250)); + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number + 1, + capture_time_ms + 1, + 250)); + TickTime::AdvanceFakeClock(10000); + EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms, + send_bucket_->QueueInMs()); + // Fails to send first packet so only one call. + EXPECT_CALL(callback_, TimeToSendPacket( + ssrc, sequence_number, capture_time_ms)) + .Times(1) + .WillOnce(Return(false)); + TickTime::AdvanceFakeClock(10000); + send_bucket_->Process(); + + // Queue remains unchanged. + EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms, + send_bucket_->QueueInMs()); + + // Fails to send second packet. + EXPECT_CALL(callback_, TimeToSendPacket( + ssrc, sequence_number, capture_time_ms)) + .Times(1) + .WillOnce(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket( + ssrc, sequence_number + 1, capture_time_ms + 1)) + .Times(1) + .WillOnce(Return(false)); + TickTime::AdvanceFakeClock(10000); + send_bucket_->Process(); + + // Queue is reduced by 1 packet. + EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms - 1, + send_bucket_->QueueInMs()); + + // Send second packet and queue becomes empty. + EXPECT_CALL(callback_, TimeToSendPacket( + ssrc, sequence_number + 1, capture_time_ms + 1)) + .Times(1) + .WillOnce(Return(true)); + TickTime::AdvanceFakeClock(10000); + send_bucket_->Process(); + EXPECT_EQ(0, send_bucket_->QueueInMs()); +} + } // namespace test } // namespace webrtc diff --git a/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h b/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h index 21104293b..5d1144804 100644 --- a/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h +++ b/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h @@ -486,7 +486,7 @@ class RtpRtcp : public Module { const RTPFragmentationHeader* fragmentation = NULL, const RTPVideoHeader* rtpVideoHdr = NULL) = 0; - virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, + virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms) = 0; virtual int TimeToSendPadding(int bytes) = 0; diff --git a/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h b/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h index 34d3ca410..0d39f8b2b 100644 --- a/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h +++ b/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h @@ -156,7 +156,7 @@ class MockRtpRtcp : public RtpRtcp { const RTPFragmentationHeader* fragmentation, const RTPVideoHeader* rtpVideoHdr)); MOCK_METHOD3(TimeToSendPacket, - void(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms)); + bool(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms)); MOCK_METHOD1(TimeToSendPadding, int(int bytes)); MOCK_METHOD3(RegisterRtcpObservers, diff --git a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc index c5defe804..99cadf263 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc @@ -967,7 +967,7 @@ int32_t ModuleRtpRtcpImpl::SendOutgoingData( return ret_val; } -void ModuleRtpRtcpImpl::TimeToSendPacket(uint32_t ssrc, +bool ModuleRtpRtcpImpl::TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms) { WEBRTC_TRACE( @@ -985,19 +985,21 @@ void ModuleRtpRtcpImpl::TimeToSendPacket(uint32_t ssrc, if (no_child_modules) { // Don't send from default module. if (SendingMedia() && ssrc == rtp_sender_.SSRC()) { - rtp_sender_.TimeToSendPacket(sequence_number, capture_time_ms); + return rtp_sender_.TimeToSendPacket(sequence_number, capture_time_ms); } } else { CriticalSectionScoped lock(critical_section_module_ptrs_.get()); std::list::iterator it = child_modules_.begin(); while (it != child_modules_.end()) { if ((*it)->SendingMedia() && ssrc == (*it)->rtp_sender_.SSRC()) { - (*it)->rtp_sender_.TimeToSendPacket(sequence_number, capture_time_ms); - return; + return (*it)->rtp_sender_.TimeToSendPacket(sequence_number, + capture_time_ms); } ++it; } } + // No RTP sender is interested in sending this packet. + return true; } int ModuleRtpRtcpImpl::TimeToSendPadding(int bytes) { diff --git a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h index 891d5b83e..c70f7cbed 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h +++ b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h @@ -188,7 +188,7 @@ class ModuleRtpRtcpImpl : public RtpRtcp { const RTPFragmentationHeader* fragmentation = NULL, const RTPVideoHeader* rtp_video_hdr = NULL); - virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, + virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms); // Returns the number of padding bytes actually sent, which can be more or // less than |bytes|. diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc index 07a3f1b9f..744bbe212 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc @@ -701,7 +701,7 @@ void RTPSender::UpdateNACKBitRate(const uint32_t bytes, } // Called from pacer when we can send the packet. -void RTPSender::TimeToSendPacket(uint16_t sequence_number, +bool RTPSender::TimeToSendPacket(uint16_t sequence_number, int64_t capture_time_ms) { StorageType type; uint16_t length = IP_PACKET_SIZE; @@ -709,11 +709,13 @@ void RTPSender::TimeToSendPacket(uint16_t sequence_number, int64_t stored_time_ms; if (packet_history_ == NULL) { - return; + // Packet cannot be found. Allow sending to continue. + return true; } if (!packet_history_->GetRTPPacket(sequence_number, 0, data_buffer, &length, &stored_time_ms, &type)) { - return; + // Packet cannot be found. Allow sending to continue. + return true; } assert(length > 0); @@ -736,7 +738,7 @@ void RTPSender::TimeToSendPacket(uint16_t sequence_number, rtp_header.sequenceNumber, rtp_header.headerLength); } - SendPacketToNetwork(data_buffer, length); + return SendPacketToNetwork(data_buffer, length); } int RTPSender::TimeToSendPadding(int bytes) { diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.h b/webrtc/modules/rtp_rtcp/source/rtp_sender.h index 1efba85fa..61dc1c57e 100644 --- a/webrtc/modules/rtp_rtcp/source/rtp_sender.h +++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.h @@ -165,7 +165,7 @@ class RTPSender : public Bitrate, public RTPSenderInterface { const RTPHeader &rtp_header, const int64_t now_ms) const; - void TimeToSendPacket(uint16_t sequence_number, int64_t capture_time_ms); + bool TimeToSendPacket(uint16_t sequence_number, int64_t capture_time_ms); int TimeToSendPadding(int bytes); // NACK. diff --git a/webrtc/video_engine/vie_encoder.cc b/webrtc/video_engine/vie_encoder.cc index 3bc4b064c..be079a497 100644 --- a/webrtc/video_engine/vie_encoder.cc +++ b/webrtc/video_engine/vie_encoder.cc @@ -88,9 +88,9 @@ class ViEPacedSenderCallback : public PacedSender::Callback { explicit ViEPacedSenderCallback(ViEEncoder* owner) : owner_(owner) { } - virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, + virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms) { - owner_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms); + return owner_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms); } virtual int TimeToSendPadding(int bytes) { return owner_->TimeToSendPadding(bytes); @@ -482,9 +482,10 @@ int32_t ViEEncoder::ScaleInputImage(bool enable) { return 0; } -void ViEEncoder::TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, +bool ViEEncoder::TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms) { - default_rtp_rtcp_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms); + return default_rtp_rtcp_->TimeToSendPacket(ssrc, sequence_number, + capture_time_ms); } int ViEEncoder::TimeToSendPadding(int bytes) { diff --git a/webrtc/video_engine/vie_encoder.h b/webrtc/video_engine/vie_encoder.h index 1209b31e4..4c4cd18cf 100644 --- a/webrtc/video_engine/vie_encoder.h +++ b/webrtc/video_engine/vie_encoder.h @@ -173,7 +173,7 @@ class ViEEncoder const uint32_t round_trip_time_ms); // Called by PacedSender. - void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, + bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms); int TimeToSendPadding(int bytes);