This patch hooks up PeerConnectionImpl to PeerConnectionSignaling.

Implements
virtual bool ProcessSignalingMessage(const std::string& msg);
virtual scoped_refptr<StreamCollection> remote_streams();
virtual void CommitStreamChanges();

BUG=
TEST=

Review URL: http://webrtc-codereview.appspot.com/187001

git-svn-id: http://webrtc.googlecode.com/svn/trunk@669 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
perkj@webrtc.org 2011-09-29 22:27:54 +00:00
parent 78083bf750
commit e804ee1a80
8 changed files with 269 additions and 81 deletions

View File

@ -81,12 +81,8 @@ class PeerConnectionObserver {
class PeerConnection : public RefCount {
public:
// Start Negotiation. Negotiation is based on if
// SignalingMessage and AddStream have been called prior to this function.
virtual bool StartNegotiation() = 0;
// SignalingMessage in json format
virtual bool SignalingMessage(const std::string& msg) = 0;
virtual bool ProcessSignalingMessage(const std::string& msg) = 0;
// Sends the msg over a data stream.
virtual bool Send(const std::string& msg) = 0;
@ -155,12 +151,14 @@ class PeerConnectionManager : public RefCount {
// remain in scope for the lifetime of the PeerConnectionManager.
static scoped_refptr<PeerConnectionManager> Create(
talk_base::Thread* worker_thread,
talk_base::Thread* signaling_thread,
PcNetworkManager* network_manager,
PcPacketSocketFactory* packet_socket_factory,
AudioDeviceModule* default_adm);
virtual scoped_refptr<PeerConnection> CreatePeerConnection(
const std::string& config) = 0;
const std::string& config,
PeerConnectionObserver* observer) = 0;
protected:
// Dtor protected as objects shouldn't be deleted via this interface.

View File

@ -1,4 +1,3 @@
/*
* libjingle
* Copyright 2011, Google Inc.
@ -33,44 +32,171 @@
#include "talk/app/webrtc_dev/scoped_refptr_msg.h"
#include "talk/app/webrtc_dev/streamcollectionimpl.h"
#include "talk/base/logging.h"
#include "talk/p2p/client/basicportallocator.h"
#include "talk/session/phone/channelmanager.h"
namespace {
// The number of the tokens in the config string.
static const size_t kConfigTokens = 2;
static const int kServiceCount = 5;
// The default stun port.
static const int kDefaultPort = 3478;
// NOTE: Must be in the same order as the ServiceType enum.
static const char* kValidServiceTypes[kServiceCount] = {
"STUN", "STUNS", "TURN", "TURNS", "INVALID" };
static const char kUserAgent[] = "PeerConnection User Agent";
enum ServiceType {
STUN, // Indicates a STUN server.
STUNS, // Indicates a STUN server used with a TLS session.
TURN, // Indicates a TURN server
TURNS, // Indicates a TURN server used with a TLS session.
INVALID, // Unknown.
};
enum {
MSG_COMMITSTREAMCHANGES = 1,
MSG_PROCESSSIGNALINGMESSAGE = 2,
MSG_RETURNREMOTEMEDIASTREAMS = 3
};
bool static ParseConfigString(const std::string& config,
talk_base::SocketAddress* addr,
ServiceType* service_type) {
std::vector<std::string> tokens;
talk_base::tokenize(config, ' ', &tokens);
if (tokens.size() != kConfigTokens) {
LOG(WARNING) << "Invalid config string";
return false;
}
*service_type = INVALID;
const std::string& type = tokens[0];
for (size_t i = 0; i < kServiceCount; ++i) {
if (type.compare(kValidServiceTypes[i]) == 0) {
*service_type = static_cast<ServiceType>(i);
break;
}
}
if (*service_type == INVALID) {
LOG(WARNING) << "Invalid service type: " << type;
return false;
}
std::string service_address = tokens[1];
int port;
tokens.clear();
talk_base::tokenize(service_address, ':', &tokens);
if (tokens.size() != kConfigTokens) {
port = kDefaultPort;
} else {
port = atoi(tokens[1].c_str());
if (port <= 0 || port > 0xffff) {
LOG(WARNING) << "Invalid port: " << tokens[1];
return false;
}
}
addr->SetIP(service_address);
addr->SetPort(port);
return true;
}
struct SignalingParams : public talk_base::MessageData {
SignalingParams(const std::string& msg,
webrtc::StreamCollection* local_streams)
: msg(msg),
local_streams(local_streams) {}
const std::string msg;
scoped_refptr<webrtc::StreamCollection> local_streams;
};
} // namespace
namespace webrtc {
PeerConnectionImpl::PeerConnectionImpl(
cricket::ChannelManager* channel_manager,
talk_base::Thread* worker_thread,
talk_base::Thread* signaling_thread,
PcNetworkManager* network_manager,
PcPacketSocketFactory* socket_factory)
: observer_(NULL),
local_media_streams_(StreamCollectionImpl::Create()),
worker_thread_(worker_thread),
signaling_thread_(signaling_thread),
channel_manager_(channel_manager),
network_manager_(network_manager),
socket_factory_(socket_factory),
port_allocator_(new cricket::BasicPortAllocator(
port_allocator_(new cricket::HttpPortAllocator(
network_manager->network_manager(),
socket_factory->socket_factory())) {
socket_factory->socket_factory(),
std::string(kUserAgent))),
signaling_(new PeerConnectionSignaling(channel_manager,
signaling_thread)) {
signaling_->SignalNewPeerConnectionMessage.connect(
this, &PeerConnectionImpl::OnNewPeerConnectionMessage);
signaling_->SignalRemoteStreamAdded.connect(
this, &PeerConnectionImpl::OnRemoteStreamAdded);
signaling_->SignalRemoteStreamRemoved.connect(
this, &PeerConnectionImpl::OnRemoteStreamRemoved);
}
PeerConnectionImpl::~PeerConnectionImpl() {
worker_thread_->Clear(this);
signaling_thread_->Clear(this);
}
bool PeerConnectionImpl::Initialize(const std::string& configuration) {
// TODO(perkj): More initialization code?
return true;
}
void PeerConnectionImpl::RegisterObserver(PeerConnectionObserver* observer) {
bool PeerConnectionImpl::Initialize(const std::string& configuration,
PeerConnectionObserver* observer) {
ASSERT(observer);
if (!observer)
return false;
observer_ = observer;
talk_base::SocketAddress address;
ServiceType service;
if (!ParseConfigString(configuration, &address, &service))
return false;
switch (service) {
case STUN: {
std::vector<talk_base::SocketAddress> address_vector;
address_vector.push_back(address);
port_allocator_->SetStunHosts(address_vector);
break;
}
case TURN: {
std::vector<std::string> address_vector;
address_vector.push_back(address.ToString());
port_allocator_->SetRelayHosts(address_vector);
break;
}
default:
ASSERT(!"NOT SUPPORTED");
return false;
}
return true;
}
scoped_refptr<StreamCollection> PeerConnectionImpl::local_streams() {
return local_media_streams_;
}
scoped_refptr<StreamCollection> PeerConnectionImpl::remote_streams() {
ScopedRefMessageData<StreamCollection>* msg =
new ScopedRefMessageData<StreamCollection>(NULL);
signaling_thread_->Send(this, MSG_RETURNREMOTEMEDIASTREAMS, msg);
return msg->data();
}
bool PeerConnectionImpl::ProcessSignalingMessage(const std::string& msg) {
SignalingParams* parameter(new SignalingParams(
msg, StreamCollectionImpl::Create(local_media_streams_)));
signaling_thread_->Post(this, MSG_PROCESSSIGNALINGMESSAGE, parameter);
}
void PeerConnectionImpl::AddStream(LocalMediaStream* local_stream) {
local_media_streams_->AddStream(local_stream);
}
@ -80,25 +206,59 @@ void PeerConnectionImpl::RemoveStream(LocalMediaStream* remove_stream) {
}
void PeerConnectionImpl::CommitStreamChanges() {
ScopedRefMessageData<StreamCollectionImpl>* msg =
new ScopedRefMessageData<StreamCollectionImpl> (
StreamCollectionImpl::Create(local_media_streams_));
worker_thread_->Post(this, MSG_COMMITSTREAMCHANGES, msg);
ScopedRefMessageData<StreamCollection>* msg =
new ScopedRefMessageData<StreamCollection> (
StreamCollectionImpl::Create(local_media_streams_));
signaling_thread_->Post(this, MSG_COMMITSTREAMCHANGES, msg);
}
void PeerConnectionImpl::OnMessage(talk_base::Message* msg) {
talk_base::MessageData* data = msg->pdata;
switch (msg->message_id) {
case MSG_COMMITSTREAMCHANGES: {
// TODO(perkj): Here is where necessary signaling
// and creation of channels should happen. Also removing of channels.
// The media streams are in the LocalStreamCollection in data.
// The collection is a copy of the local_media_streams_ and only
// accessible in this thread context.
ScopedRefMessageData<StreamCollection>* param(
static_cast<ScopedRefMessageData<StreamCollection>*> (data));
signaling_->CreateOffer(param->data());
delete data; // Because it is Posted.
break;
}
case MSG_PROCESSSIGNALINGMESSAGE: {
SignalingParams* params(static_cast<SignalingParams*> (data));
// TODO(perkj) Deserialize params->msg into a PeerConnection Message.
signaling_->ProcessSignalingMessage(NULL, params->local_streams);
delete data; // Because it is Posted.
break;
}
case MSG_RETURNREMOTEMEDIASTREAMS: {
ScopedRefMessageData<StreamCollection>* param(
static_cast<ScopedRefMessageData<StreamCollection>*> (data));
param->data() = StreamCollectionImpl::Create(remote_media_streams_);
break;
}
}
delete data; // because it is Posted
}
void PeerConnectionImpl::OnNewPeerConnectionMessage(
PeerConnectionMessage* message) {
// TODO(perkj): serialize the message.
std::string msg;
observer_->OnSignalingMessage(msg);
}
void PeerConnectionImpl::OnRemoteStreamAdded(MediaStream* remote_stream) {
// TODO(perkj): add function in pc signaling to return a collection of
// remote streams.
// This way we can avoid keeping a separate list of remote_media_streams_.
remote_media_streams_->AddStream(remote_stream);
observer_->OnAddStream(remote_stream);
}
void PeerConnectionImpl::OnRemoteStreamRemoved(MediaStream* remote_stream) {
// TODO(perkj): add function in pc signaling to return a collection of
// remote streams.
// This way we can avoid keeping a separate list of remote_media_streams_.
remote_media_streams_->RemoveStream(remote_stream);
observer_->OnRemoveStream(remote_stream);
}
} // namespace webrtc

View File

@ -32,8 +32,9 @@
#include <string>
#include "talk/app/webrtc_dev/peerconnection.h"
#include "talk/app/webrtc_dev/peerconnectionsignaling.h"
#include "talk/base/scoped_ptr.h"
#include "talk/p2p/base/portallocator.h"
#include "talk/p2p/client/httpportallocator.h"
namespace cricket {
class ChannelManager;
@ -42,69 +43,53 @@ class ChannelManager;
namespace webrtc {
class StreamCollectionImpl;
// PeerConnectionImpl implements the PeerConnection interface.
// It uses PeerConnectionSignaling and WebRtcSession to implement
// the PeerConnection functionality.
class PeerConnectionImpl : public PeerConnection,
public talk_base::MessageHandler {
public talk_base::MessageHandler,
public sigslot::has_slots<> {
public:
enum Error {
ERROR_NONE = 0, // Good
ERROR_TIMEOUT = 1, // No Candidates generated for X amount of time
ERROR_AUDIO_DEVICE = 2, // DeviceManager audio device error
ERROR_VIDEO_DEVICE = 3, // DeviceManager video device error
ERROR_NETWORK = 4, // Transport errors
ERROR_MEDIADESCRIPTION = 5, // SignalingMessage error
ERROR_MEDIA = 6, // Related to Engines
ERROR_UNKNOWN = 10, // Everything else
};
PeerConnectionImpl(cricket::ChannelManager* channel_manager,
talk_base::Thread* worker_thread,
talk_base::Thread* signaling_thread,
PcNetworkManager* network_manager,
PcPacketSocketFactory* socket_factory);
bool Initialize(const std::string& configuration);
bool Initialize(const std::string& configuration,
PeerConnectionObserver* observer);
virtual ~PeerConnectionImpl();
// Interfaces from PeerConnection
virtual bool StartNegotiation() {
// TODO(perkj): implement
ASSERT(false);
}
virtual bool SignalingMessage(const std::string& msg) {
// TODO(perkj): implement
ASSERT(false);
}
virtual bool ProcessSignalingMessage(const std::string& msg);
virtual bool Send(const std::string& msg) {
// TODO(perkj): implement
ASSERT(false);
}
virtual scoped_refptr<StreamCollection> local_streams();
virtual scoped_refptr<StreamCollection> remote_streams() {
// TODO(perkj): implement
ASSERT(false);
}
virtual scoped_refptr<StreamCollection> remote_streams();
virtual void AddStream(LocalMediaStream* stream);
virtual void RemoveStream(LocalMediaStream* stream);
virtual void CommitStreamChanges();
void RegisterObserver(PeerConnectionObserver* observer);
private:
// Implement talk_base::MessageHandler.
void OnMessage(talk_base::Message* msg);
private:
enum {
MSG_COMMITSTREAMCHANGES = 3
};
// Signals from PeerConnectionSignaling.
void OnNewPeerConnectionMessage(PeerConnectionMessage* message);
void OnRemoteStreamAdded(MediaStream* remote_stream);
void OnRemoteStreamRemoved(MediaStream* remote_stream);
PeerConnectionObserver* observer_;
scoped_refptr<StreamCollectionImpl> local_media_streams_;
scoped_refptr<StreamCollectionImpl> remote_media_streams_;
talk_base::Thread* worker_thread_; // Weak ref from PeerConnectionManager.
talk_base::Thread* signaling_thread_; // Weak ref from PeerConnectionManager.
cricket::ChannelManager* channel_manager_;
scoped_refptr<PcNetworkManager> network_manager_;
scoped_refptr<PcPacketSocketFactory> socket_factory_;
talk_base::scoped_ptr<cricket::PortAllocator> port_allocator_;
talk_base::scoped_ptr<cricket::HttpPortAllocator> port_allocator_;
talk_base::scoped_ptr<PeerConnectionSignaling> signaling_;
};
} // namespace webrtc

View File

@ -35,21 +35,33 @@
#include "talk/base/thread.h"
static const char kStreamLabel1[] = "local_stream_1";
static const char kStunConfiguration[] = "STUN stun.l.google.com:19302";
namespace webrtc {
class MockPeerConnectionObserver : public PeerConnectionObserver {
public:
virtual void OnError() {}
virtual void OnMessage(const std::string& msg) {}
virtual void OnSignalingMessage(const std::string& msg) {}
virtual void OnStateChange(Readiness state) {}
virtual void OnAddStream(MediaStream* stream) {}
virtual void OnRemoveStream(MediaStream* stream) {}
};
class PeerConnectionImplTest : public testing::Test {
public:
protected:
virtual void SetUp() {
pc_factory_ = webrtc::PeerConnectionManager::Create();
ASSERT_TRUE(pc_factory_.get() != NULL);
pc_ = pc_factory_->CreatePeerConnection("");
pc_ = pc_factory_->CreatePeerConnection(kStunConfiguration, &observer_);
ASSERT_TRUE(pc_.get() != NULL);
}
scoped_refptr<webrtc::PeerConnectionManager> pc_factory_;
scoped_refptr<PeerConnection> pc_;
MockPeerConnectionObserver observer_;
};
TEST_F(PeerConnectionImplTest, AddRemoveStream) {

View File

@ -43,16 +43,31 @@
#endif
static const char kAudioDeviceLabel[] = "dummy_audio_device";
static const char kStunConfiguration[] = "STUN stun.l.google.com:19302";
namespace webrtc {
class MockPeerConnectionObserver : public PeerConnectionObserver {
public:
virtual void OnError() {}
virtual void OnMessage(const std::string& msg) {}
virtual void OnSignalingMessage(const std::string& msg) {}
virtual void OnStateChange(Readiness state) {}
virtual void OnAddStream(MediaStream* stream) {}
virtual void OnRemoveStream(MediaStream* stream) {}
};
TEST(PeerConnectionManager, CreatePCUsingInternalModules) {
MockPeerConnectionObserver observer;
scoped_refptr<PeerConnectionManager> manager(PeerConnectionManager::Create());
ASSERT_TRUE(manager.get() != NULL);
scoped_refptr<PeerConnection> pc1(manager->CreatePeerConnection(""));
EXPECT_TRUE(pc1.get() != NULL);
scoped_refptr<PeerConnection> pc1(manager->CreatePeerConnection("",
&observer));
EXPECT_TRUE(pc1.get() == NULL);
scoped_refptr<PeerConnection> pc2(manager->CreatePeerConnection(
kStunConfiguration, &observer));
scoped_refptr<webrtc::PeerConnection> pc2(manager->CreatePeerConnection(""));
EXPECT_TRUE(pc2.get() != NULL);
}
@ -71,16 +86,21 @@ TEST(PeerConnectionManager, CreatePCUsingExternalModules) {
PcPacketSocketFactory::Create(new talk_base::BasicPacketSocketFactory));
scoped_refptr<PeerConnectionManager> manager =
PeerConnectionManager::Create(w_thread.get(),
PeerConnectionManager::Create(talk_base::Thread::Current(),
talk_base::Thread::Current(),
network_manager,
socket_factory,
audio_device);
ASSERT_TRUE(manager.get() != NULL);
scoped_refptr<webrtc::PeerConnection> pc1(manager->CreatePeerConnection(""));
EXPECT_TRUE(pc1.get() != NULL);
MockPeerConnectionObserver observer;
scoped_refptr<webrtc::PeerConnection> pc1(manager->CreatePeerConnection(
"", &observer));
scoped_refptr<webrtc::PeerConnection> pc2(manager->CreatePeerConnection(""));
EXPECT_TRUE(pc1.get() == NULL);
scoped_refptr<PeerConnection> pc2(manager->CreatePeerConnection(
kStunConfiguration, &observer));
EXPECT_TRUE(pc2.get() != NULL);
}

View File

@ -94,11 +94,13 @@ scoped_refptr<PeerConnectionManager> PeerConnectionManager::Create() {
scoped_refptr<PeerConnectionManager> PeerConnectionManager::Create(
talk_base::Thread* worker_thread,
talk_base::Thread* signaling_thread,
PcNetworkManager* network_manager,
PcPacketSocketFactory* socket_factory,
AudioDeviceModule* default_adm) {
RefCountImpl<PeerConnectionManagerImpl>* pc_manager =
new RefCountImpl<PeerConnectionManagerImpl>(worker_thread,
signaling_thread,
network_manager,
socket_factory,
default_adm);
@ -111,23 +113,28 @@ scoped_refptr<PeerConnectionManager> PeerConnectionManager::Create(
PeerConnectionManagerImpl::PeerConnectionManagerImpl()
: worker_thread_(new talk_base::Thread),
signaling_thread_(new talk_base::Thread),
network_manager_(PcNetworkManager::Create(
new talk_base::BasicNetworkManager())),
socket_factory_(PcPacketSocketFactory::Create(
new talk_base::BasicPacketSocketFactory)) {
worker_thread_ptr_ = worker_thread_.get();
signaling_thread_ptr_ = signaling_thread_.get();
}
PeerConnectionManagerImpl::PeerConnectionManagerImpl(
talk_base::Thread* worker_thread,
talk_base::Thread* signaling_thread,
PcNetworkManager* network_manager,
PcPacketSocketFactory* socket_factory,
AudioDeviceModule* default_adm)
: worker_thread_ptr_(worker_thread),
signaling_thread_ptr_(signaling_thread),
network_manager_(network_manager),
socket_factory_(socket_factory),
default_adm_(default_adm) {
ASSERT(worker_thread);
ASSERT(signaling_thread);
ASSERT(network_manager->network_manager());
ASSERT(socket_factory->socket_factory());
ASSERT(default_adm);
@ -139,6 +146,8 @@ PeerConnectionManagerImpl::~PeerConnectionManagerImpl() {
bool PeerConnectionManagerImpl::Initialize() {
if (worker_thread_.get() && !worker_thread_->Start())
return false;
if (signaling_thread_.get() && !signaling_thread_->Start())
return false;
cricket::DeviceManager* device_manager(new WebRtcDeviceManager());
cricket::WebRtcMediaEngine* webrtc_media_engine = NULL;
@ -158,13 +167,14 @@ bool PeerConnectionManagerImpl::Initialize() {
}
scoped_refptr<PeerConnection> PeerConnectionManagerImpl::CreatePeerConnection(
const std::string& configuration) {
const std::string& configuration,
PeerConnectionObserver* observer) {
RefCountImpl<PeerConnectionImpl>* pc =
new RefCountImpl<PeerConnectionImpl>(channel_manager_.get(),
worker_thread_ptr_,
signaling_thread_ptr_,
network_manager_,
socket_factory_);
if (!pc->Initialize(configuration)) {
if (!pc->Initialize(configuration, observer)) {
delete pc;
pc = NULL;
}

View File

@ -38,21 +38,25 @@ namespace webrtc {
class PeerConnectionManagerImpl : public PeerConnectionManager {
public:
scoped_refptr<PeerConnection> CreatePeerConnection(const std::string& config);
scoped_refptr<PeerConnection> CreatePeerConnection(
const std::string& config,
PeerConnectionObserver* observer);
bool Initialize();
protected:
PeerConnectionManagerImpl();
PeerConnectionManagerImpl(talk_base::Thread* worker_thread,
talk_base::Thread* signaling_thread,
PcNetworkManager* network_manager,
PcPacketSocketFactory* socket_factory,
AudioDeviceModule* default_adm);
virtual ~PeerConnectionManagerImpl();
private:
// Channel manager worker thread. Only used if the external thread is not set.
talk_base::scoped_ptr<talk_base::Thread> worker_thread_;
talk_base::Thread* worker_thread_ptr_;
talk_base::scoped_ptr<talk_base::Thread> signaling_thread_;
talk_base::Thread* signaling_thread_ptr_;
scoped_refptr<PcNetworkManager> network_manager_;
scoped_refptr<PcPacketSocketFactory> socket_factory_;
talk_base::scoped_ptr<cricket::ChannelManager> channel_manager_;

View File

@ -45,9 +45,9 @@ class StreamCollectionImpl : public StreamCollection {
}
static scoped_refptr<StreamCollectionImpl> Create(
StreamCollectionImpl* local_streams) {
StreamCollectionImpl* streams) {
RefCountImpl<StreamCollectionImpl>* implementation =
new RefCountImpl<StreamCollectionImpl>(local_streams);
new RefCountImpl<StreamCollectionImpl>(streams);
return implementation;
}
@ -88,7 +88,6 @@ class StreamCollectionImpl : public StreamCollection {
}
}
protected:
StreamCollectionImpl() {}
explicit StreamCollectionImpl(StreamCollectionImpl* original)