Introduced pause and resume to the pacer

Review URL: https://webrtc-codereview.appspot.com/1217007

git-svn-id: http://webrtc.googlecode.com/svn/trunk@3717 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
pwestin@webrtc.org 2013-03-22 23:39:29 +00:00
parent 14c9909ef6
commit db4185664c
3 changed files with 160 additions and 40 deletions

View File

@ -28,6 +28,9 @@ class PacedSender : public Module {
kNormalPriority = 2, // Put in back of the line.
kLowPriority = 3, // Put in back of the low priority line.
};
// Low priority packets are mixed with the normal priority packets
// while we are paused.
class Callback {
public:
// Note: packets sent as a result of a callback should not pass by this
@ -47,6 +50,12 @@ class PacedSender : public Module {
// Enable/disable pacing.
void SetStatus(bool enable);
// Temporarily pause all sending.
void Pause();
// Resume sending packets.
void Resume();
// Current total estimated bitrate.
void UpdateBitrate(int target_bitrate_kbps);
@ -80,6 +89,10 @@ class PacedSender : public Module {
bool GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
int64_t* capture_time_ms);
// Local helper function to GetNextPacket.
void GetNextPacketFromList(std::list<Packet>* list,
uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms);
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBytesPerInterval(uint32_t delta_time_in_ms);
@ -88,6 +101,7 @@ class PacedSender : public Module {
Callback* callback_;
bool enable_;
bool paused_;
scoped_ptr<CriticalSectionWrapper> critsect_;
int target_bitrate_kbytes_per_s_;
int bytes_remaining_interval_;
@ -95,6 +109,7 @@ class PacedSender : public Module {
TickTime time_last_update_;
TickTime time_last_send_;
std::list<Packet> high_priority_packets_;
std::list<Packet> normal_priority_packets_;
std::list<Packet> low_priority_packets_;
};

View File

@ -39,6 +39,7 @@ namespace webrtc {
PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps)
: callback_(callback),
enable_(false),
paused_(false),
critsect_(CriticalSectionWrapper::CreateCriticalSection()),
target_bitrate_kbytes_per_s_(target_bitrate_kbps >> 3), // Divide by 8.
bytes_remaining_interval_(0),
@ -48,10 +49,21 @@ PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps)
}
PacedSender::~PacedSender() {
high_priority_packets_.clear();
normal_priority_packets_.clear();
low_priority_packets_.clear();
}
void PacedSender::Pause() {
CriticalSectionScoped cs(critsect_.get());
paused_ = true;
}
void PacedSender::Resume() {
CriticalSectionScoped cs(critsect_.get());
paused_ = false;
}
void PacedSender::SetStatus(bool enable) {
CriticalSectionScoped cs(critsect_.get());
enable_ = enable;
@ -70,12 +82,38 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
UpdateState(bytes);
return true; // We can send now.
}
if (paused_) {
// Queue all packets when we are paused.
switch (priority) {
case kHighPriority:
high_priority_packets_.push_back(
Packet(ssrc, sequence_number, capture_time_ms, bytes));
break;
case kNormalPriority:
case kLowPriority:
// Queue the low priority packets in the normal priority queue when we
// are paused to avoid starvation.
normal_priority_packets_.push_back(
Packet(ssrc, sequence_number, capture_time_ms, bytes));
break;
}
return false;
}
switch (priority) {
case kHighPriority:
UpdateState(bytes);
return true; // We can send now.
if (high_priority_packets_.empty() &&
bytes_remaining_interval_ > 0) {
UpdateState(bytes);
return true; // We can send now.
}
high_priority_packets_.push_back(
Packet(ssrc, sequence_number, capture_time_ms, bytes));
return false;
case kNormalPriority:
if (normal_priority_packets_.empty() && bytes_remaining_interval_ > 0) {
if (high_priority_packets_.empty() &&
normal_priority_packets_.empty() &&
bytes_remaining_interval_ > 0) {
UpdateState(bytes);
return true; // We can send now.
}
@ -83,7 +121,8 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
Packet(ssrc, sequence_number, capture_time_ms, bytes));
return false;
case kLowPriority:
if (normal_priority_packets_.empty() &&
if (high_priority_packets_.empty() &&
normal_priority_packets_.empty() &&
low_priority_packets_.empty() &&
bytes_remaining_interval_ > 0) {
UpdateState(bytes);
@ -114,7 +153,7 @@ int32_t PacedSender::Process() {
CriticalSectionScoped cs(critsect_.get());
int elapsed_time_ms = (now - time_last_update_).Milliseconds();
time_last_update_ = now;
if (elapsed_time_ms > 0) {
if (!paused_ && elapsed_time_ms > 0) {
uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
UpdateBytesPerInterval(delta_time_ms);
uint32_t ssrc;
@ -125,7 +164,8 @@ int32_t PacedSender::Process() {
callback_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
critsect_->Enter();
}
if (normal_priority_packets_.empty() &&
if (high_priority_packets_.empty() &&
normal_priority_packets_.empty() &&
low_priority_packets_.empty() &&
padding_bytes_remaining_interval_ > 0) {
critsect_->Leave();
@ -164,41 +204,49 @@ bool PacedSender::GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
if (bytes_remaining_interval_ <= 0) {
// All bytes consumed for this interval.
// Check if we have not sent in a too long time.
if (!normal_priority_packets_.empty()) {
if ((TickTime::Now() - time_last_send_).Milliseconds() >
kMaxQueueTimeWithoutSendingMs) {
Packet packet = normal_priority_packets_.front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
normal_priority_packets_.pop_front();
if ((TickTime::Now() - time_last_send_).Milliseconds() >
kMaxQueueTimeWithoutSendingMs) {
if (!high_priority_packets_.empty()) {
GetNextPacketFromList(&high_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
if (!normal_priority_packets_.empty()) {
GetNextPacketFromList(&normal_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
}
return false;
}
if (!high_priority_packets_.empty()) {
GetNextPacketFromList(&high_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
if (!normal_priority_packets_.empty()) {
Packet packet = normal_priority_packets_.front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
normal_priority_packets_.pop_front();
GetNextPacketFromList(&normal_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
if (!low_priority_packets_.empty()) {
Packet packet = low_priority_packets_.front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
low_priority_packets_.pop_front();
GetNextPacketFromList(&low_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
return false;
}
void PacedSender::GetNextPacketFromList(std::list<Packet>* list,
uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms) {
Packet packet = list->front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
list->pop_front();
}
// MUST have critsect_ when calling.
void PacedSender::UpdateState(int num_bytes) {
time_last_send_ = TickTime::Now();

View File

@ -13,11 +13,12 @@
#include "webrtc/modules/pacing/include/paced_sender.h"
namespace {
const int kTargetBitrate = 800;
};
using testing::_;
namespace webrtc {
namespace test {
static const int kTargetBitrate = 800;
class MockPacedSenderCallback : public PacedSender::Callback {
public:
@ -54,7 +55,7 @@ TEST_F(PacedSenderTest, QueuePacket) {
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number, capture_time_ms, 250));
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number, capture_time_ms)).Times(0);
TickTime::AdvanceFakeClock(4);
@ -87,12 +88,12 @@ TEST_F(PacedSenderTest, PaceQueuedPackets) {
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
}
EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
for (int k = 0; k < 10; ++k) {
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(3);
TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}
@ -159,21 +160,20 @@ TEST_F(PacedSenderTest, Priority) {
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kHighPriority,
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kHighPriority,
ssrc, sequence_number++, capture_time_ms, 250));
// Expect all normal priority to be sent out first.
EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(2);
// Expect all high and normal priority to be sent out first.
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3);
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
EXPECT_CALL(callback_, TimeToSendPacket(ssrc_low_priority,
testing::_, capture_time_ms_low_priority)).Times(1);
EXPECT_CALL(callback_, TimeToSendPacket(
ssrc_low_priority, _, capture_time_ms_low_priority)).Times(1);
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
@ -181,4 +181,61 @@ TEST_F(PacedSenderTest, Priority) {
EXPECT_EQ(0, send_bucket_->Process());
}
TEST_F(PacedSenderTest, Pause) {
uint32_t ssrc_low_priority = 12345;
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
int64_t second_capture_time_ms = 67890;
// Due to the multiplicative factor we can send 3 packets not 2 packets.
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kLowPriority,
ssrc_low_priority, sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
send_bucket_->Pause();
// Expect everything to be queued.
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kLowPriority,
ssrc_low_priority, sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, second_capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kHighPriority,
ssrc, sequence_number++, capture_time_ms, 250));
// Expect no packet to come out while paused.
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPacket(_, _, _)).Times(0);
for (int i = 0; i < 10; ++i) {
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}
// Expect high prio packets to come out first followed by all packets in the
// way they were added.
EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms)).Times(3);
send_bucket_->Resume();
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
EXPECT_CALL(callback_,
TimeToSendPacket(_, _, second_capture_time_ms)).Times(1);
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}
} // namespace test
} // namespace webrtc