Run loopback tests with network thread.

Running with a network thread provides a more realistic simulation. Like
a real network, packets are handed off to a socket, or buffer, and then
the call returns. This prevents weird scenarios when both the sending
side and receiving side are on the call stack simultaneously, which can
cause deadlocks as locks could otherwise be taken simultaneously in both
the sender and receiver order by the same thread.

BUG=
R=stefan@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/2000005

git-svn-id: http://webrtc.googlecode.com/svn/trunk@4522 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
pbos@webrtc.org 2013-08-12 12:59:04 +00:00
parent ecbe0aa543
commit 9668467d87
6 changed files with 152 additions and 38 deletions

View File

@ -0,0 +1,91 @@
/*
* Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/video_engine/test/common/direct_transport.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "webrtc/video_engine/new_include/video_engine.h"
namespace webrtc {
namespace test {
DirectTransport::DirectTransport()
: lock_(CriticalSectionWrapper::CreateCriticalSection()),
packet_event_(EventWrapper::Create()),
thread_(ThreadWrapper::CreateThread(NetworkProcess, this)),
receiver_(NULL) {
unsigned int thread_id;
EXPECT_TRUE(thread_->Start(thread_id));
}
DirectTransport::~DirectTransport() { StopSending(); }
void DirectTransport::StopSending() { EXPECT_TRUE(thread_->Stop()); }
void DirectTransport::SetReceiver(newapi::PacketReceiver* receiver) {
receiver_ = receiver;
}
bool DirectTransport::SendRTP(const uint8_t* data, size_t length) {
QueuePacket(data, length);
return true;
}
bool DirectTransport::SendRTCP(const uint8_t* data, size_t length) {
QueuePacket(data, length);
return true;
}
DirectTransport::Packet::Packet() : length(0) {}
DirectTransport::Packet::Packet(const uint8_t* data, size_t length)
: length(length) {
EXPECT_LE(length, sizeof(this->data));
memcpy(this->data, data, length);
}
void DirectTransport::QueuePacket(const uint8_t* data, size_t length) {
CriticalSectionScoped crit(lock_.get());
EXPECT_TRUE(receiver_ != NULL);
packet_queue_.push_back(Packet(data, length));
packet_event_->Set();
}
bool DirectTransport::NetworkProcess(void* transport) {
return static_cast<DirectTransport*>(transport)->SendPackets();
}
bool DirectTransport::SendPackets() {
while (true) {
Packet p;
{
webrtc::CriticalSectionScoped crit(lock_.get());
if (packet_queue_.empty())
break;
p = packet_queue_.front();
packet_queue_.pop_front();
}
receiver_->DeliverPacket(p.data, p.length);
}
switch (packet_event_->Wait(10)) {
case kEventSignaled:
packet_event_->Reset();
break;
case kEventTimeout:
break;
case kEventError:
// TODO(pbos): Log a warning here?
return true;
}
return true;
}
} // namespace test
} // namespace webrtc

View File

@ -12,32 +12,55 @@
#include <assert.h> #include <assert.h>
#include <deque>
#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
#include "webrtc/system_wrappers/interface/event_wrapper.h"
#include "webrtc/system_wrappers/interface/scoped_ptr.h"
#include "webrtc/system_wrappers/interface/thread_wrapper.h"
#include "webrtc/video_engine/new_include/transport.h" #include "webrtc/video_engine/new_include/transport.h"
namespace webrtc { namespace webrtc {
namespace newapi {
class PacketReceiver;
} // namespace newapi
namespace test { namespace test {
class DirectTransport : public newapi::Transport { class DirectTransport : public newapi::Transport {
public: public:
explicit DirectTransport(newapi::PacketReceiver* receiver) DirectTransport();
: receiver_(receiver) {} ~DirectTransport();
void SetReceiver(newapi::PacketReceiver* receiver) { receiver_ = receiver; } virtual void StopSending();
virtual void SetReceiver(newapi::PacketReceiver* receiver);
virtual bool SendRTP(const uint8_t* data, size_t length) OVERRIDE { virtual bool SendRTP(const uint8_t* data, size_t length) OVERRIDE;
assert(receiver_ != NULL); virtual bool SendRTCP(const uint8_t* data, size_t length) OVERRIDE;
return receiver_->DeliverPacket(data, length);
}
virtual bool SendRTCP(const uint8_t* data, size_t length) OVERRIDE {
assert(receiver_ != NULL);
return SendRTP(data, length);
}
private: private:
struct Packet {
Packet();
Packet(const uint8_t* data, size_t length);
uint8_t data[1500];
size_t length;
};
void QueuePacket(const uint8_t* data, size_t length);
static bool NetworkProcess(void* transport);
bool SendPackets();
scoped_ptr<CriticalSectionWrapper> lock_;
scoped_ptr<EventWrapper> packet_event_;
scoped_ptr<ThreadWrapper> thread_;
std::deque<Packet> packet_queue_;
newapi::PacketReceiver* receiver_; newapi::PacketReceiver* receiver_;
}; };
} // test } // namespace test
} // webrtc } // namespace webrtc
#endif // WEBRTC_VIDEO_ENGINE_TEST_COMMON_DIRECT_TRANSPORT_H_ #endif // WEBRTC_VIDEO_ENGINE_TEST_COMMON_DIRECT_TRANSPORT_H_

View File

@ -17,6 +17,7 @@
#include "webrtc/system_wrappers/interface/scoped_ptr.h" #include "webrtc/system_wrappers/interface/scoped_ptr.h"
#include "webrtc/system_wrappers/interface/event_wrapper.h" #include "webrtc/system_wrappers/interface/event_wrapper.h"
#include "webrtc/video_engine/new_include/video_engine.h" #include "webrtc/video_engine/new_include/video_engine.h"
#include "webrtc/video_engine/test/common/direct_transport.h"
#include "webrtc/video_engine/test/common/frame_generator.h" #include "webrtc/video_engine/test/common/frame_generator.h"
#include "webrtc/video_engine/test/common/frame_generator_capturer.h" #include "webrtc/video_engine/test/common/frame_generator_capturer.h"
#include "webrtc/video_engine/test/common/generate_ssrcs.h" #include "webrtc/video_engine/test/common/generate_ssrcs.h"
@ -25,12 +26,11 @@ namespace webrtc {
class NackObserver { class NackObserver {
public: public:
class SenderTransport : public newapi::Transport { class SenderTransport : public test::DirectTransport {
public: public:
explicit SenderTransport(NackObserver* observer) explicit SenderTransport(NackObserver* observer) : observer_(observer) {}
: receiver_(NULL), observer_(observer) {}
bool SendRTP(const uint8_t* packet, size_t length) { virtual bool SendRTP(const uint8_t* packet, size_t length) OVERRIDE {
{ {
CriticalSectionScoped lock(observer_->crit_.get()); CriticalSectionScoped lock(observer_->crit_.get());
if (observer_->DropSendPacket(packet, length)) if (observer_->DropSendPacket(packet, length))
@ -38,25 +38,15 @@ class NackObserver {
++observer_->sent_rtp_packets_; ++observer_->sent_rtp_packets_;
} }
return receiver_->DeliverPacket(packet, length); return test::DirectTransport::SendRTP(packet, length);
} }
bool SendRTCP(const uint8_t* packet, size_t length) {
return receiver_->DeliverPacket(packet, length);
}
newapi::PacketReceiver* receiver_;
NackObserver* observer_; NackObserver* observer_;
} sender_transport_; } sender_transport_;
class ReceiverTransport : public newapi::Transport { class ReceiverTransport : public test::DirectTransport {
public: public:
explicit ReceiverTransport(NackObserver* observer) explicit ReceiverTransport(NackObserver* observer) : observer_(observer) {}
: receiver_(NULL), observer_(observer) {}
bool SendRTP(const uint8_t* packet, size_t length) {
return receiver_->DeliverPacket(packet, length);
}
bool SendRTCP(const uint8_t* packet, size_t length) { bool SendRTCP(const uint8_t* packet, size_t length) {
{ {
@ -80,10 +70,9 @@ class NackObserver {
observer_->RtcpWithoutNack(); observer_->RtcpWithoutNack();
} }
} }
return receiver_->DeliverPacket(packet, length); return DirectTransport::SendRTCP(packet, length);
} }
newapi::PacketReceiver* receiver_;
NackObserver* observer_; NackObserver* observer_;
} receiver_transport_; } receiver_transport_;
@ -102,6 +91,11 @@ class NackObserver {
return received_all_retransmissions_->Wait(2 * 60 * 1000); return received_all_retransmissions_->Wait(2 * 60 * 1000);
} }
void StopSending() {
sender_transport_.StopSending();
receiver_transport_.StopSending();
}
private: private:
// Decides whether a current packet should be dropped or not. A retransmitted // Decides whether a current packet should be dropped or not. A retransmitted
// packet will never be dropped. Packets are dropped in short bursts. When // packet will never be dropped. Packets are dropped in short bursts. When
@ -244,9 +238,8 @@ TEST_P(EngineTest, ReceivesAndRetransmitsNack) {
scoped_ptr<newapi::VideoCall> receiver_call( scoped_ptr<newapi::VideoCall> receiver_call(
CreateTestCall(&observer.receiver_transport_)); CreateTestCall(&observer.receiver_transport_));
observer.receiver_transport_.receiver_ = sender_call->Receiver(); observer.receiver_transport_.SetReceiver(sender_call->Receiver());
observer.sender_transport_.receiver_ = receiver_call->Receiver(); observer.sender_transport_.SetReceiver(receiver_call->Receiver());
newapi::VideoSendStream::Config send_config = newapi::VideoSendStream::Config send_config =
CreateTestSendConfig(sender_call.get(), params); CreateTestSendConfig(sender_call.get(), params);
@ -278,6 +271,7 @@ TEST_P(EngineTest, ReceivesAndRetransmitsNack) {
receiver_call->DestroyReceiveStream(receive_stream); receiver_call->DestroyReceiveStream(receive_stream);
receiver_call->DestroySendStream(send_stream); receiver_call->DestroySendStream(send_stream);
observer.StopSending();
} }
INSTANTIATE_TEST_CASE_P(EngineTest, EngineTest, ::testing::Values(video_vga)); INSTANTIATE_TEST_CASE_P(EngineTest, EngineTest, ::testing::Values(video_vga));

View File

@ -273,7 +273,7 @@ TEST_P(FullStackTest, NoPacketLoss) {
scoped_ptr<newapi::VideoEngine> video_engine( scoped_ptr<newapi::VideoEngine> video_engine(
newapi::VideoEngine::Create(newapi::VideoEngineConfig())); newapi::VideoEngine::Create(newapi::VideoEngineConfig()));
test::DirectTransport transport(NULL); test::DirectTransport transport;
VideoAnalyzer analyzer( VideoAnalyzer analyzer(
NULL, NULL,
&transport, &transport,
@ -342,6 +342,8 @@ TEST_P(FullStackTest, NoPacketLoss) {
call->DestroyReceiveStream(receive_stream); call->DestroyReceiveStream(receive_stream);
call->DestroySendStream(send_stream); call->DestroySendStream(send_stream);
transport.StopSending();
} }
INSTANTIATE_TEST_CASE_P(FullStack, INSTANTIATE_TEST_CASE_P(FullStack,

View File

@ -42,7 +42,7 @@ TEST_F(LoopbackTest, Test) {
scoped_ptr<newapi::VideoEngine> video_engine( scoped_ptr<newapi::VideoEngine> video_engine(
newapi::VideoEngine::Create(webrtc::newapi::VideoEngineConfig())); newapi::VideoEngine::Create(webrtc::newapi::VideoEngineConfig()));
test::DirectTransport transport(NULL); test::DirectTransport transport;
newapi::VideoCall::Config call_config; newapi::VideoCall::Config call_config;
call_config.send_transport = &transport; call_config.send_transport = &transport;
call_config.overuse_detection = true; call_config.overuse_detection = true;
@ -98,5 +98,7 @@ TEST_F(LoopbackTest, Test) {
call->DestroyReceiveStream(receive_stream); call->DestroyReceiveStream(receive_stream);
call->DestroySendStream(send_stream); call->DestroySendStream(send_stream);
transport.StopSending();
} }
} // webrtc } // webrtc

View File

@ -12,6 +12,8 @@
'target_name': 'video_tests_common', 'target_name': 'video_tests_common',
'type': 'static_library', 'type': 'static_library',
'sources': [ 'sources': [
'common/direct_transport.cc',
'common/direct_transport.h',
'common/file_capturer.cc', 'common/file_capturer.cc',
'common/file_capturer.h', 'common/file_capturer.h',
'common/flags.cc', 'common/flags.cc',