Improve TCP implementation by adding ssthresh and make it possible to start it with an offset.

Add a propagation delay to tests and make the run-time configurable for the fairness tests.

Handle losses in-between feedback messages.

BUG=4549
R=pbos@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/49819004

Cr-Commit-Position: refs/heads/master@{#9099}
This commit is contained in:
Stefan Holmer
2015-04-28 14:39:00 +02:00
parent 9d657cfd66
commit bcbcd84888
12 changed files with 165 additions and 77 deletions

View File

@@ -87,7 +87,7 @@ TEST_P(BweSimulation, Choke1000kbps500kbps1000kbpsBiDirectional) {
PacketReceiver receiver2(&downlink_, kFlowIds[1], GetParam(), true, false);
choke2.SetCapacity(500);
delay.SetDelay(0);
delay.SetDelayMs(0);
choke.SetCapacity(1000);
choke.SetMaxDelay(500);
@@ -240,13 +240,33 @@ TEST_P(BweSimulation, SelfFairnessTest) {
TEST_P(BweSimulation, PacedSelfFairnessTest) {
VerboseLogging(true);
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 4, 0, 2000);
RunFairnessTest(GetParam(), 4, 0, 1000, 3000, 50);
}
TEST_P(BweSimulation, PacedTcpFairnessTest) {
VerboseLogging(true);
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 1, 1, 3000);
RunFairnessTest(GetParam(), 4, 0, 1000, 3000, 500);
}
TEST_P(BweSimulation, PacedSelfFairness1000msTest) {
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 4, 0, 1000, 3000, 1000);
}
TEST_P(BweSimulation, TcpFairness50msTest) {
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 1, 1, 1000, 2000, 50);
}
TEST_P(BweSimulation, TcpFairness500msTest) {
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 1, 1, 1000, 2000, 500);
}
TEST_P(BweSimulation, TcpFairness1000msTest) {
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 1, 1, 1000, 2000, 1000);
}
#endif // BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
} // namespace bwe

View File

@@ -65,7 +65,7 @@ TEST_P(DefaultBweTest, SteadyDelay) {
VideoSender sender(&uplink_, &source, GetParam());
DelayFilter delay(&uplink_, 0);
PacketReceiver receiver(&uplink_, 0, GetParam(), false, false);
delay.SetDelay(1000);
delay.SetDelayMs(1000);
RunFor(10 * 60 * 1000);
}
@@ -76,7 +76,7 @@ TEST_P(DefaultBweTest, IncreasingDelay1) {
PacketReceiver receiver(&uplink_, 0, GetParam(), false, false);
RunFor(10 * 60 * 1000);
for (int i = 0; i < 30 * 2; ++i) {
delay.SetDelay(i);
delay.SetDelayMs(i);
RunFor(10 * 1000);
}
RunFor(10 * 60 * 1000);
@@ -90,10 +90,10 @@ TEST_P(DefaultBweTest, IncreasingDelay2) {
PacketReceiver receiver(&uplink_, 0, GetParam(), false, false);
RunFor(1 * 60 * 1000);
for (int i = 1; i < 51; ++i) {
delay.SetDelay(10.0f * i);
delay.SetDelayMs(10.0f * i);
RunFor(10 * 1000);
}
delay.SetDelay(0.0f);
delay.SetDelayMs(0.0f);
RunFor(10 * 60 * 1000);
}
@@ -104,12 +104,12 @@ TEST_P(DefaultBweTest, JumpyDelay1) {
PacketReceiver receiver(&uplink_, 0, GetParam(), false, false);
RunFor(10 * 60 * 1000);
for (int i = 1; i < 200; ++i) {
delay.SetDelay((10 * i) % 500);
delay.SetDelayMs((10 * i) % 500);
RunFor(1000);
delay.SetDelay(1.0f);
delay.SetDelayMs(1.0f);
RunFor(1000);
}
delay.SetDelay(0.0f);
delay.SetDelayMs(0.0f);
RunFor(10 * 60 * 1000);
}
@@ -211,11 +211,11 @@ TEST_P(DefaultBweTest, Multi1) {
choke.SetCapacity(1000);
RunFor(1 * 60 * 1000);
for (int i = 1; i < 51; ++i) {
delay.SetDelay(100.0f * i);
delay.SetDelayMs(100.0f * i);
RunFor(10 * 1000);
}
RunFor(500 * 1000);
delay.SetDelay(0.0f);
delay.SetDelayMs(0.0f);
RunFor(5 * 60 * 1000);
}
@@ -254,7 +254,7 @@ INSTANTIATE_TEST_CASE_P(VideoSendersTest,
TEST_P(BweFeedbackTest, Choke1000kbps500kbps1000kbps) {
AdaptiveVideoSource source(0, 30, 300, 0, 0);
VideoSender sender(&uplink_, &source, GetParam());
PacedVideoSender sender(&uplink_, &source, GetParam());
ChokeFilter filter(&uplink_, 0);
RateCounterFilter counter(&uplink_, 0, "receiver_input");
PacketReceiver receiver(&uplink_, 0, GetParam(), false, false);
@@ -274,7 +274,7 @@ TEST_P(BweFeedbackTest, Choke1000kbps500kbps1000kbps) {
TEST_P(BweFeedbackTest, Choke200kbps30kbps200kbps) {
AdaptiveVideoSource source(0, 30, 300, 0, 0);
VideoSender sender(&uplink_, &source, GetParam());
PacedVideoSender sender(&uplink_, &source, GetParam());
ChokeFilter filter(&uplink_, 0);
RateCounterFilter counter(&uplink_, 0, "receiver_input");
PacketReceiver receiver(&uplink_, 0, GetParam(), false, false);
@@ -323,12 +323,27 @@ TEST_P(BweFeedbackTest, DISABLED_GoogleWifiTrace3Mbps) {
TEST_P(BweFeedbackTest, PacedSelfFairnessTest) {
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 4, 0, 3000);
RunFairnessTest(GetParam(), 4, 0, 300, 3000, 50);
}
TEST_P(BweFeedbackTest, TcpFairnessTest) {
TEST_P(BweFeedbackTest, PacedSelfFairness1000msTest) {
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 1, 1, 2000);
RunFairnessTest(GetParam(), 4, 0, 300, 3000, 1000);
}
TEST_P(BweFeedbackTest, TcpFairness50msTest) {
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 1, 1, 300, 2000, 50);
}
TEST_P(BweFeedbackTest, TcpFairness500msTest) {
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 1, 1, 300, 2000, 500);
}
TEST_P(BweFeedbackTest, TcpFairness1000msTest) {
srand(Clock::GetRealTimeClock()->TimeInMicroseconds());
RunFairnessTest(GetParam(), 1, 1, 300, 2000, 1000);
}
} // namespace bwe
} // namespace testing

View File

@@ -21,7 +21,7 @@ namespace testing {
namespace bwe {
const int kMinBitrateKbps = 150;
const int kMaxBitrateKbps = 2000;
const int kMaxBitrateKbps = 3000;
class BweSender : public Module {
public:

View File

@@ -192,7 +192,8 @@ void BweTest::PrintResults(
webrtc::test::PrintResultMeanAndError("BwePerformance", GetTestName(),
"Average delay", delay_ms.AsString(),
"ms", false);
double fairness_index = 0.0;
double fairness_index = 1.0;
if (!flow_throughput_kbps.empty()) {
double squared_bitrate_sum = 0.0;
for (Stats<double> flow : flow_throughput_kbps) {
squared_bitrate_sum += flow.GetMean() * flow.GetMean();
@@ -200,6 +201,7 @@ void BweTest::PrintResults(
}
fairness_index *= fairness_index;
fairness_index /= flow_throughput_kbps.size() * squared_bitrate_sum;
}
webrtc::test::PrintResult("BwePerformance", GetTestName(), "Fairness",
fairness_index * 100, "%", false);
}
@@ -207,7 +209,9 @@ void BweTest::PrintResults(
void BweTest::RunFairnessTest(BandwidthEstimatorType bwe_type,
size_t num_media_flows,
size_t num_tcp_flows,
int capacity_kbps) {
int64_t run_time_seconds,
int capacity_kbps,
int max_delay_ms) {
std::set<int> all_flow_ids;
std::set<int> media_flow_ids;
std::set<int> tcp_flow_ids;
@@ -230,17 +234,22 @@ void BweTest::RunFairnessTest(BandwidthEstimatorType bwe_type,
for (int media_flow : media_flow_ids) {
// Streams started 20 seconds apart to give them different advantage when
// competing for the bandwidth.
const int64_t kFlowStartOffsetMs = i++ * (rand() % 40000);
sources.push_back(new AdaptiveVideoSource(media_flow, 30, 300, 0,
i++ * (rand() % 40000)));
kFlowStartOffsetMs));
senders.push_back(new PacedVideoSender(&uplink_, sources.back(), bwe_type));
}
const int64_t kTcpStartOffsetMs = 20000;
for (int tcp_flow : tcp_flow_ids)
senders.push_back(new TcpSender(&uplink_, tcp_flow));
senders.push_back(new TcpSender(&uplink_, tcp_flow, kTcpStartOffsetMs));
ChokeFilter choke(&uplink_, all_flow_ids);
choke.SetCapacity(capacity_kbps);
choke.SetMaxDelay(1000);
choke.SetMaxDelay(max_delay_ms);
DelayFilter delay_uplink(&uplink_, all_flow_ids);
delay_uplink.SetDelayMs(25);
std::vector<RateCounterFilter*> rate_counters;
for (int flow : all_flow_ids) {
@@ -262,7 +271,10 @@ void BweTest::RunFairnessTest(BandwidthEstimatorType bwe_type,
new PacketReceiver(&uplink_, tcp_flow, kTcpEstimator, false, false));
}
RunFor(15 * 60 * 1000);
DelayFilter delay_downlink(&downlink_, all_flow_ids);
delay_downlink.SetDelayMs(25);
RunFor(run_time_seconds * 1000);
std::vector<Stats<double>> flow_throughput_kbps;
for (i = 0; i < all_flow_ids.size(); ++i)

View File

@@ -84,7 +84,9 @@ class BweTest {
void RunFairnessTest(BandwidthEstimatorType bwe_type,
size_t num_media_flows,
size_t num_tcp_flows,
int capacity_kbps);
int64_t run_time_seconds,
int capacity_kbps,
int max_delay_ms);
Link downlink_;
Link uplink_;

View File

@@ -341,7 +341,7 @@ DelayFilter::DelayFilter(PacketProcessorListener* listener,
last_send_time_us_(0) {
}
void DelayFilter::SetDelay(int64_t delay_ms) {
void DelayFilter::SetDelayMs(int64_t delay_ms) {
BWE_TEST_LOGGING_ENABLE(false);
BWE_TEST_LOGGING_LOG1("Delay", "%d ms", static_cast<int>(delay_ms));
assert(delay_ms >= 0);

View File

@@ -254,7 +254,7 @@ class DelayFilter : public PacketProcessor {
DelayFilter(PacketProcessorListener* listener, const FlowIds& flow_ids);
virtual ~DelayFilter() {}
void SetDelay(int64_t delay_ms);
void SetDelayMs(int64_t delay_ms);
virtual void RunFor(int64_t time_ms, Packets* in_out);
private:

View File

@@ -333,7 +333,7 @@ class BweTestFramework_DelayFilterTest : public ::testing::Test {
}
void TestDelayFilter(int64_t delay_ms) {
filter_.SetDelay(delay_ms);
filter_.SetDelayMs(delay_ms);
TestDelayFilter(1, 0, 0); // No input should yield no output
// Single packet
@@ -341,7 +341,7 @@ class BweTestFramework_DelayFilterTest : public ::testing::Test {
TestDelayFilter(delay_ms, 0, 0);
for (int i = 0; i < delay_ms; ++i) {
filter_.SetDelay(i);
filter_.SetDelayMs(i);
TestDelayFilter(1, 10, 10);
}
TestDelayFilter(0, 0, 0);
@@ -351,11 +351,11 @@ class BweTestFramework_DelayFilterTest : public ::testing::Test {
TestDelayFilter(delay_ms, 0, 0);
for (int i = 1; i < delay_ms + 1; ++i) {
filter_.SetDelay(i);
filter_.SetDelayMs(i);
TestDelayFilter(1, 5, 5);
}
TestDelayFilter(0, 0, 0);
filter_.SetDelay(2 * delay_ms);
filter_.SetDelayMs(2 * delay_ms);
TestDelayFilter(1, 0, 0);
TestDelayFilter(delay_ms, 13, 13);
TestDelayFilter(delay_ms, 0, 0);
@@ -364,11 +364,11 @@ class BweTestFramework_DelayFilterTest : public ::testing::Test {
TestDelayFilter(delay_ms, 0, 0);
for (int i = 0; i < 2 * delay_ms; ++i) {
filter_.SetDelay(2 * delay_ms - i - 1);
filter_.SetDelayMs(2 * delay_ms - i - 1);
TestDelayFilter(1, 5, 5);
}
TestDelayFilter(0, 0, 0);
filter_.SetDelay(0);
filter_.SetDelayMs(0);
TestDelayFilter(0, 7, 7);
ASSERT_TRUE(IsTimeSorted(accumulated_packets_));
@@ -389,7 +389,7 @@ TEST_F(BweTestFramework_DelayFilterTest, Delay0) {
TestDelayFilter(1, 0, 0); // No input should yield no output
TestDelayFilter(1, 10, 10); // Expect no delay (delay time is zero)
TestDelayFilter(1, 0, 0); // Check no packets are still in buffer
filter_.SetDelay(0);
filter_.SetDelayMs(0);
TestDelayFilter(1, 5, 5); // Expect no delay (delay time is zero)
TestDelayFilter(1, 0, 0); // Check no packets are still in buffer
}
@@ -416,7 +416,7 @@ TEST_F(BweTestFramework_DelayFilterTest, JumpToZeroDelay) {
Packets packets;
// Delay a bunch of packets, accumulate them to the 'acc' list.
delay.SetDelay(100.0f);
delay.SetDelayMs(100.0f);
for (uint32_t i = 0; i < 10; ++i) {
packets.push_back(new MediaPacket(i * 100, i));
}
@@ -427,7 +427,7 @@ TEST_F(BweTestFramework_DelayFilterTest, JumpToZeroDelay) {
// Drop delay to zero, send a few more packets through the delay, append them
// to the 'acc' list and verify that it is all sorted.
delay.SetDelay(0.0f);
delay.SetDelayMs(0.0f);
for (uint32_t i = 10; i < 50; ++i) {
packets.push_back(new MediaPacket(i * 100, i));
}
@@ -446,12 +446,12 @@ TEST_F(BweTestFramework_DelayFilterTest, IncreasingDelay) {
TestDelayFilter(i);
}
// Reach a steady state.
filter_.SetDelay(100);
filter_.SetDelayMs(100);
TestDelayFilter(1, 20, 20);
TestDelayFilter(2, 0, 0);
TestDelayFilter(99, 20, 20);
// Drop delay back down to zero.
filter_.SetDelay(0);
filter_.SetDelayMs(0);
TestDelayFilter(1, 100, 100);
TestDelayFilter(23010, 0, 0);
ASSERT_TRUE(IsTimeSorted(accumulated_packets_));

View File

@@ -11,6 +11,7 @@
#include "webrtc/modules/remote_bitrate_estimator/test/estimators/send_side.h"
#include "webrtc/base/logging.h"
#include "webrtc/modules/remote_bitrate_estimator/test/bwe_test_logging.h"
namespace webrtc {
namespace testing {
@@ -23,7 +24,9 @@ FullBweSender::FullBweSender(int kbps, BitrateObserver* observer, Clock* clock)
.Create(this, clock, kAimdControl, 1000 * kMinBitrateKbps)),
feedback_observer_(bitrate_controller_->CreateRtcpBandwidthObserver()),
clock_(clock),
send_time_history_(10000) {
send_time_history_(10000),
has_received_ack_(false),
last_acked_seq_num_(0) {
assert(kbps >= kMinBitrateKbps);
assert(kbps <= kMaxBitrateKbps);
bitrate_controller_->SetStartBitrate(1000 * kbps);
@@ -51,23 +54,32 @@ void FullBweSender::GiveFeedback(const FeedbackPacket& feedback) {
LOG(LS_WARNING) << "Ack arrived too late.";
}
}
rbe_->IncomingPacketFeedbackVector(packet_feedback_vector);
// TODO(holmer): Handle losses in between feedback packets.
if (has_received_ack_) {
int expected_packets = fb.packet_feedback_vector().back().sequence_number -
fb.packet_feedback_vector().front().sequence_number +
1;
last_acked_seq_num_;
// Assuming no reordering for now.
if (expected_packets <= 0)
return;
int lost_packets =
expected_packets - static_cast<int>(fb.packet_feedback_vector().size());
if (expected_packets > 0) {
int lost_packets = expected_packets -
static_cast<int>(fb.packet_feedback_vector().size());
report_block_.fractionLost = (lost_packets << 8) / expected_packets;
report_block_.cumulativeLost += lost_packets;
report_block_.extendedHighSeqNum =
packet_feedback_vector.back().sequence_number;
ReportBlockList report_blocks;
report_blocks.push_back(report_block_);
feedback_observer_->OnReceivedRtcpReceiverReport(
report_blocks, 0, clock_->TimeInMilliseconds());
}
bitrate_controller_->Process();
last_acked_seq_num_ = LatestSequenceNumber(
packet_feedback_vector.back().sequence_number, last_acked_seq_num_);
} else {
last_acked_seq_num_ = packet_feedback_vector.back().sequence_number;
has_received_ack_ = true;
}
}
void FullBweSender::OnPacketsSent(const Packets& packets) {

View File

@@ -42,6 +42,8 @@ class FullBweSender : public BweSender, public RemoteBitrateObserver {
Clock* const clock_;
RTCPReportBlock report_block_;
SendTimeHistory send_time_history_;
bool has_received_ack_;
uint16_t last_acked_seq_num_;
DISALLOW_IMPLICIT_CONSTRUCTORS(FullBweSender);
};

View File

@@ -138,8 +138,10 @@ void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) {
int64_t time_until_process_ms = TimeUntilNextProcess(modules_);
int64_t time_until_feedback_ms = time_ms;
if (!feedbacks.empty())
time_until_feedback_ms = feedbacks.front()->send_time_us() / 1000 -
clock_.TimeInMilliseconds();
time_until_feedback_ms =
std::max<int64_t>(feedbacks.front()->send_time_us() / 1000 -
clock_.TimeInMilliseconds(),
0);
int64_t time_until_next_event_ms =
std::min(time_until_feedback_ms, time_until_process_ms);
@@ -265,6 +267,10 @@ void PacedVideoSender::OnNetworkChanged(uint32_t target_bitrate_bps,
}
void TcpSender::RunFor(int64_t time_ms, Packets* in_out) {
if (now_ms_ + time_ms < offset_ms_) {
now_ms_ += time_ms;
return;
}
BWE_TEST_LOGGING_CONTEXT("Sender");
BWE_TEST_LOGGING_CONTEXT(*flow_ids().begin());
std::list<FeedbackPacket*> feedbacks =
@@ -277,13 +283,6 @@ void TcpSender::RunFor(int64_t time_ms, Packets* in_out) {
SendPackets(in_out);
}
for (auto it = in_flight_.begin(); it != in_flight_.end();) {
if (it->time_ms < now_ms_ - 1000)
in_flight_.erase(it++);
else
++it;
}
SendPackets(in_out);
now_ms_ += time_ms;
}
@@ -291,6 +290,10 @@ void TcpSender::RunFor(int64_t time_ms, Packets* in_out) {
void TcpSender::SendPackets(Packets* in_out) {
int cwnd = ceil(cwnd_);
int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
int timed_out = TriggerTimeouts();
if (timed_out > 0) {
HandleLoss();
}
if (packets_to_send > 0) {
Packets generated = GeneratePackets(packets_to_send);
for (Packet* packet : generated)
@@ -312,9 +315,8 @@ void TcpSender::UpdateCongestionControl(const FeedbackPacket* fb) {
in_flight_.erase(InFlight(ack_seq_num, now_ms_));
if (missing > 0) {
cwnd_ /= 2.0f;
in_slow_start_ = false;
} else if (in_slow_start_) {
HandleLoss();
} else if (cwnd_ <= ssthresh_) {
cwnd_ += tcp_fb->acked_packets().size();
} else {
cwnd_ += 1.0f / cwnd_;
@@ -324,6 +326,24 @@ void TcpSender::UpdateCongestionControl(const FeedbackPacket* fb) {
LatestSequenceNumber(tcp_fb->acked_packets().back(), last_acked_seq_num_);
}
int TcpSender::TriggerTimeouts() {
int timed_out = 0;
for (auto it = in_flight_.begin(); it != in_flight_.end();) {
if (it->time_ms < now_ms_ - 1000) {
in_flight_.erase(it++);
++timed_out;
} else {
++it;
}
}
return timed_out;
}
void TcpSender::HandleLoss() {
ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
cwnd_ = ssthresh_;
}
Packets TcpSender::GeneratePackets(size_t num_packets) {
Packets generated;
for (size_t i = 0; i < num_packets; ++i) {

View File

@@ -12,6 +12,7 @@
#define WEBRTC_MODULES_REMOTE_BITRATE_ESTIMATOR_TEST_PACKET_SENDER_H_
#include <list>
#include <limits>
#include <string>
#include "webrtc/base/constructormagic.h"
@@ -104,14 +105,15 @@ class PacedVideoSender : public VideoSender, public PacedSender::Callback {
class TcpSender : public PacketSender {
public:
TcpSender(PacketProcessorListener* listener, int flow_id)
TcpSender(PacketProcessorListener* listener, int flow_id, int64_t offset_ms)
: PacketSender(listener, flow_id),
now_ms_(0),
in_slow_start_(false),
cwnd_(10),
ssthresh_(std::numeric_limits<int>::max()),
ack_received_(false),
last_acked_seq_num_(0),
next_sequence_number_(0) {}
next_sequence_number_(0),
offset_ms_(offset_ms) {}
virtual ~TcpSender() {}
@@ -140,15 +142,18 @@ class TcpSender : public PacketSender {
void SendPackets(Packets* in_out);
void UpdateCongestionControl(const FeedbackPacket* fb);
int TriggerTimeouts();
void HandleLoss();
Packets GeneratePackets(size_t num_packets);
int64_t now_ms_;
bool in_slow_start_;
float cwnd_;
int ssthresh_;
std::set<InFlight> in_flight_;
bool ack_received_;
uint16_t last_acked_seq_num_;
uint16_t next_sequence_number_;
int64_t offset_ms_;
};
} // namespace bwe
} // namespace testing