Adding pacing module, will replace the transmission_bucket in the RTP module.

TESTED=unittest
Review URL: https://webrtc-codereview.appspot.com/930015

git-svn-id: http://webrtc.googlecode.com/svn/trunk@3073 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
pwestin@webrtc.org 2012-11-09 20:56:23 +00:00
parent f875fd22c0
commit b518017e71
6 changed files with 553 additions and 0 deletions

View File

@ -23,6 +23,7 @@
'audio_processing/audio_processing.gypi',
'bitrate_controller/bitrate_controller.gypi',
'media_file/source/media_file.gypi',
'pacing/pacing.gypi',
'remote_bitrate_estimator/remote_bitrate_estimator.gypi',
'rtp_rtcp/source/rtp_rtcp.gypi',
'udp_transport/source/udp_transport.gypi',

View File

@ -0,0 +1,4 @@
pwestin@webrtc.org
stefan@webrtc.org
mflodman@webrtc.org
asapersson@webrtc.org

View File

@ -0,0 +1,102 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef WEBRTC_MODULES_PACED_SENDER_H_
#define WEBRTC_MODULES_PACED_SENDER_H_
#include <list>
#include "webrtc/modules/interface/module.h"
#include "webrtc/system_wrappers/interface/scoped_ptr.h"
#include "webrtc/system_wrappers/interface/tick_util.h"
#include "webrtc/typedefs.h"
namespace webrtc {
class CriticalSectionWrapper;
class PacedSender : public Module {
public:
enum Priority {
kHighPriority = 0, // Pass through; will be sent immediately.
kNormalPriority = 2, // Put in back of the line.
kLowPriority = 3, // Put in back of the low priority line.
};
class Callback {
public:
// Note: packets sent as a result of a callback should not pass by this
// module again.
// Called when it's time to send a queued packet.
virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
int64_t capture_time_ms) = 0;
// Called when it's a good time to send a padding data.
virtual void TimeToSendPadding(int bytes) = 0;
protected:
virtual ~Callback() {}
};
PacedSender(Callback* callback, int target_bitrate_kbps);
virtual ~PacedSender();
// Enable/disable pacing.
void SetStatus(bool enable);
// Current total estimated bitrate.
void UpdateBitrate(int target_bitrate_kbps);
// Returns true if we send the packet now, else it will add the packet
// information to the queue and call TimeToSendPacket when it's time to send.
bool SendPacket(Priority priority, uint32_t ssrc, uint16_t sequence_number,
int64_t capture_time_ms, int bytes);
// Returns the number of milliseconds until the module want a worker thread
// to call Process.
virtual int32_t TimeUntilNextProcess();
// Process any pending packets in the queue(s).
virtual int32_t Process();
private:
struct Packet {
Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
int length_in_bytes)
: ssrc_(ssrc),
sequence_number_(seq_number),
capture_time_ms_(capture_time_ms),
bytes_(length_in_bytes) {
}
uint32_t ssrc_;
uint16_t sequence_number_;
int64_t capture_time_ms_;
int bytes_;
};
// Checks if next packet in line can be transmitted. Returns true on success.
bool GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
int64_t* capture_time_ms);
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBytesPerInterval(uint32_t delta_time_in_ms);
// Updates the buffers with the number of bytes that we sent.
void UpdateState(int num_bytes);
Callback* callback_;
bool enable_;
scoped_ptr<CriticalSectionWrapper> critsect_;
int target_bitrate_kbytes_per_s_;
int bytes_remaining_interval_;
int padding_bytes_remaining_interval_;
TickTime time_last_update_;
TickTime time_last_send_;
std::list<Packet> normal_priority_packets_;
std::list<Packet> low_priority_packets_;
};
} // namespace webrtc
#endif // WEBRTC_MODULES_PACED_SENDER_H_

View File

@ -0,0 +1,209 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/modules/pacing/include/paced_sender.h"
#include <assert.h>
#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
namespace {
// Multiplicative factor that is applied to the target bitrate to calculate the
// number of bytes that can be transmitted per interval.
// Increasing this factor will result in lower delays in cases of bitrate
// overshoots from the encoder.
const float kBytesPerIntervalMargin = 1.5f;
// Time limit in milliseconds between packet bursts.
const int kMinPacketLimitMs = 5;
// Upper cap on process interval, in case process has not been called in a long
// time.
const int kMaxIntervalTimeMs = 30;
// Max time that the first packet in the queue can sit in the queue if no
// packets are sent, regardless of buffer state. In practice only in effect at
// low bitrates (less than 320 kbits/s).
const int kMaxQueueTimeWithoutSendingMs = 30;
} // namespace
namespace webrtc {
PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps)
: callback_(callback),
enable_(false),
critsect_(CriticalSectionWrapper::CreateCriticalSection()),
target_bitrate_kbytes_per_s_(target_bitrate_kbps >> 3), // Divide by 8.
bytes_remaining_interval_(0),
padding_bytes_remaining_interval_(0),
time_last_update_(TickTime::Now()) {
UpdateBytesPerInterval(kMinPacketLimitMs);
}
PacedSender::~PacedSender() {
normal_priority_packets_.clear();
low_priority_packets_.clear();
}
void PacedSender::SetStatus(bool enable) {
CriticalSectionScoped cs(critsect_.get());
enable_ = enable;
}
void PacedSender::UpdateBitrate(int target_bitrate_kbps) {
CriticalSectionScoped cs(critsect_.get());
target_bitrate_kbytes_per_s_ = target_bitrate_kbps >> 3; // Divide by 8.
}
bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
uint16_t sequence_number, int64_t capture_time_ms, int bytes) {
CriticalSectionScoped cs(critsect_.get());
if (!enable_) {
UpdateState(bytes);
return true; // We can send now.
}
switch (priority) {
case kHighPriority:
UpdateState(bytes);
return true; // We can send now.
case kNormalPriority:
if (normal_priority_packets_.empty() && bytes_remaining_interval_ > 0) {
UpdateState(bytes);
return true; // We can send now.
}
normal_priority_packets_.push_back(
Packet(ssrc, sequence_number, capture_time_ms, bytes));
return false;
case kLowPriority:
if (normal_priority_packets_.empty() &&
low_priority_packets_.empty() &&
bytes_remaining_interval_ > 0) {
UpdateState(bytes);
return true; // We can send now.
}
low_priority_packets_.push_back(
Packet(ssrc, sequence_number, capture_time_ms, bytes));
return false;
}
return false;
}
int32_t PacedSender::TimeUntilNextProcess() {
CriticalSectionScoped cs(critsect_.get());
int64_t elapsed_time_ms =
(TickTime::Now() - time_last_update_).Milliseconds();
if (elapsed_time_ms <= 0) {
return kMinPacketLimitMs;
}
if (elapsed_time_ms >= kMinPacketLimitMs) {
return 0;
}
return kMinPacketLimitMs - elapsed_time_ms;
}
int32_t PacedSender::Process() {
TickTime now = TickTime::Now();
CriticalSectionScoped cs(critsect_.get());
int elapsed_time_ms = (now - time_last_update_).Milliseconds();
time_last_update_ = now;
if (elapsed_time_ms > 0) {
uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
UpdateBytesPerInterval(delta_time_ms);
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms;
while (GetNextPacket(&ssrc, &sequence_number, &capture_time_ms)) {
critsect_->Leave();
callback_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
critsect_->Enter();
}
if (normal_priority_packets_.empty() &&
low_priority_packets_.empty() &&
padding_bytes_remaining_interval_ > 0) {
critsect_->Leave();
callback_->TimeToSendPadding(padding_bytes_remaining_interval_);
critsect_->Enter();
padding_bytes_remaining_interval_ = 0;
bytes_remaining_interval_ -= padding_bytes_remaining_interval_;
}
}
return 0;
}
// MUST have critsect_ when calling.
void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
uint32_t bytes_per_interval = target_bitrate_kbytes_per_s_ * delta_time_ms;
if (bytes_remaining_interval_ < 0) {
// We overused last interval, compensate this interval.
bytes_remaining_interval_ += kBytesPerIntervalMargin * bytes_per_interval;
} else {
// If we underused last interval we can't use it this interval.
bytes_remaining_interval_ = kBytesPerIntervalMargin * bytes_per_interval;
}
if (padding_bytes_remaining_interval_ < 0) {
// We overused last interval, compensate this interval.
padding_bytes_remaining_interval_ += bytes_per_interval;
} else {
// If we underused last interval we can't use it this interval.
padding_bytes_remaining_interval_ = bytes_per_interval;
}
}
// MUST have critsect_ when calling.
bool PacedSender::GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
int64_t* capture_time_ms) {
if (bytes_remaining_interval_ <= 0) {
// All bytes consumed for this interval.
// Check if we have not sent in a too long time.
if (!normal_priority_packets_.empty()) {
if ((TickTime::Now() - time_last_send_).Milliseconds() >
kMaxQueueTimeWithoutSendingMs) {
Packet packet = normal_priority_packets_.front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
normal_priority_packets_.pop_front();
return true;
}
}
return false;
}
if (!normal_priority_packets_.empty()) {
Packet packet = normal_priority_packets_.front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
normal_priority_packets_.pop_front();
return true;
}
if (!low_priority_packets_.empty()) {
Packet packet = low_priority_packets_.front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
low_priority_packets_.pop_front();
return true;
}
return false;
}
// MUST have critsect_ when calling.
void PacedSender::UpdateState(int num_bytes) {
time_last_send_ = TickTime::Now();
bytes_remaining_interval_ -= num_bytes;
padding_bytes_remaining_interval_ -= num_bytes;
}
} // namespace webrtc

View File

@ -0,0 +1,184 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "webrtc/modules/pacing/include/paced_sender.h"
namespace {
const int kTargetBitrate = 800;
};
namespace webrtc {
class MockPacedSenderCallback : public PacedSender::Callback {
public:
MOCK_METHOD3(TimeToSendPacket,
void(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms));
MOCK_METHOD1(TimeToSendPadding,
void(int bytes));
};
class PacedSenderTest : public ::testing::Test {
protected:
PacedSenderTest() {
TickTime::UseFakeClock(123456);
// Need to initialize PacedSender after we initialize clock.
send_bucket_.reset(new PacedSender(&callback_, kTargetBitrate));
send_bucket_->SetStatus(true);
}
MockPacedSenderCallback callback_;
scoped_ptr<PacedSender> send_bucket_;
};
TEST_F(PacedSenderTest, QueuePacket) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
// Due to the multiplicative factor we can send 3 packets not 2 packets.
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number, capture_time_ms, 250));
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number, capture_time_ms)).Times(0);
TickTime::AdvanceFakeClock(4);
EXPECT_EQ(1, send_bucket_->TimeUntilNextProcess());
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number, capture_time_ms)).Times(1);
TickTime::AdvanceFakeClock(1);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
sequence_number++;
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
}
TEST_F(PacedSenderTest, PaceQueuedPackets) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
// Due to the multiplicative factor we can send 3 packets not 2 packets.
for (int i = 0; i < 3; ++i) {
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
}
for (int j = 0; j < 30; ++j) {
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
}
EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0);
for (int k = 0; k < 10; ++k) {
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(3);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
}
TEST_F(PacedSenderTest, Padding) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
// Due to the multiplicative factor we can send 3 packets not 2 packets.
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
EXPECT_CALL(callback_, TimeToSendPadding(250)).Times(1);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number, capture_time_ms)).Times(0);
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
EXPECT_CALL(callback_, TimeToSendPadding(500)).Times(1);
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}
TEST_F(PacedSenderTest, Priority) {
uint32_t ssrc_low_priority = 12345;
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
int64_t capture_time_ms_low_priority = 1234567;
// Due to the multiplicative factor we can send 3 packets not 2 packets.
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kLowPriority,
ssrc_low_priority, sequence_number++, capture_time_ms_low_priority, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
// Expect normal and low priority to be queued and high to pass through.
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kLowPriority,
ssrc_low_priority, sequence_number++, capture_time_ms_low_priority, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kHighPriority,
ssrc, sequence_number++, capture_time_ms, 250));
// Expect all normal priority to be sent out first.
EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(2);
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
EXPECT_CALL(callback_, TimeToSendPacket(ssrc_low_priority,
testing::_, capture_time_ms_low_priority)).Times(1);
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}
} // namespace webrtc

View File

@ -0,0 +1,53 @@
# Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
{
'targets': [
{
'target_name': 'paced_sender',
'type': '<(library)',
'dependencies': [
'<(webrtc_root)/system_wrappers/source/system_wrappers.gyp:system_wrappers',
],
'include_dirs': [
'include',
],
'sources': [
'include/paced_sender.h',
'paced_sender.cc',
],
},
], # targets
'conditions': [
['include_tests==1', {
'targets' : [
{
'target_name': 'paced_sender_unittests',
'type': 'executable',
'dependencies': [
'paced_sender',
'<(webrtc_root)/test/test.gyp:test_support_main',
'<(DEPTH)/testing/gmock.gyp:gmock',
'<(DEPTH)/testing/gtest.gyp:gtest',
],
'sources': [
'paced_sender_unittest.cc',
],
},
], # targets
}], # include_tests
], # conditions
}
# Local Variables:
# tab-width:2
# indent-tabs-mode:nil
# End:
# vim: set expandtab tabstop=2 shiftwidth=2