Moving creation of sessiondescriptions to webrtcsession.

Fixing defect durin close down in peerconnectionmanager.

BUG=
TEST=

Review URL: http://webrtc-codereview.appspot.com/193004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@693 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
perkj@webrtc.org 2011-10-05 17:15:36 +00:00
parent bf39ff4271
commit 487e401a27
15 changed files with 356 additions and 177 deletions

View File

@ -702,6 +702,7 @@
'<(libjingle_mods)/source/talk/app/webrtc_dev/peerconnectionsignaling.cc',
'<(libjingle_mods)/source/talk/app/webrtc_dev/peerconnectionsignaling.h',
'<(libjingle_mods)/source/talk/app/webrtc_dev/ref_count.h',
'<(libjingle_mods)/source/talk/app/webrtc_dev/sessiondescriptionprovider.h'
'<(libjingle_mods)/source/talk/app/webrtc_dev/streamcollectionimpl.h',
'<(libjingle_mods)/source/talk/app/webrtc_dev/videorendererimpl.cc',
'<(libjingle_mods)/source/talk/app/webrtc_dev/videotrackimpl.cc',
@ -710,6 +711,7 @@
'<(libjingle_mods)/source/talk/app/webrtc_dev/webrtc_devicemanager.cc',
'<(libjingle_mods)/source/talk/app/webrtc_dev/webrtcjson.cc',
'<(libjingle_mods)/source/talk/app/webrtc_dev/webrtcjson.h',
'<(libjingle_mods)/source/talk/app/webrtc_dev/webrtcsessionobserver',
'<(libjingle_mods)/source/talk/app/webrtc_dev/webrtcsession.cc',
'<(libjingle_mods)/source/talk/app/webrtc_dev/webrtcsession.h',
],

View File

@ -60,7 +60,8 @@ enum ServiceType {
enum {
MSG_COMMITSTREAMCHANGES = 1,
MSG_PROCESSSIGNALINGMESSAGE = 2,
MSG_RETURNREMOTEMEDIASTREAMS = 3
MSG_RETURNREMOTEMEDIASTREAMS = 3,
MSG_TERMINATE = 4
};
bool static ParseConfigString(const std::string& config,
@ -137,10 +138,10 @@ PeerConnectionImpl::PeerConnectionImpl(
network_manager->network_manager(),
socket_factory->socket_factory(),
std::string(kUserAgent))),
signaling_(new PeerConnectionSignaling(channel_manager,
signaling_thread)),
session_(new WebRtcSession(channel_manager, signaling_thread,
worker_thread, port_allocator_.get(), signaling_.get())),
worker_thread, port_allocator_.get())),
signaling_(new PeerConnectionSignaling(signaling_thread,
session_.get())),
stream_handler_(new MediaStreamHandlers(session_.get())) {
signaling_->SignalNewPeerConnectionMessage.connect(
this, &PeerConnectionImpl::OnNewPeerConnectionMessage);
@ -152,6 +153,15 @@ PeerConnectionImpl::PeerConnectionImpl(
PeerConnectionImpl::~PeerConnectionImpl() {
signaling_thread_->Clear(this);
signaling_thread_->Send(this, MSG_TERMINATE);
}
// Clean up what needs to be cleaned up on the signaling thread.
void PeerConnectionImpl::Terminate_s() {
stream_handler_.reset();
signaling_.reset();
session_.reset();
port_allocator_.reset();
}
bool PeerConnectionImpl::Initialize(const std::string& configuration,
@ -182,6 +192,7 @@ bool PeerConnectionImpl::Initialize(const std::string& configuration,
ASSERT(!"NOT SUPPORTED");
return false;
}
// Initialize the WebRtcSession. It creates transport channels etc.
return session_->Initialize();
}
@ -241,6 +252,10 @@ void PeerConnectionImpl::OnMessage(talk_base::Message* msg) {
param->data() = StreamCollectionImpl::Create(remote_media_streams_);
break;
}
case MSG_TERMINATE: {
Terminate_s();
break;
}
}
}

View File

@ -84,6 +84,8 @@ class PeerConnectionImpl : public PeerConnection,
void OnRemoteStreamAdded(MediaStream* remote_stream);
void OnRemoteStreamRemoved(MediaStream* remote_stream);
void Terminate_s();
PeerConnectionObserver* observer_;
scoped_refptr<StreamCollectionImpl> local_media_streams_;
scoped_refptr<StreamCollectionImpl> remote_media_streams_;
@ -93,8 +95,8 @@ class PeerConnectionImpl : public PeerConnection,
scoped_refptr<PcNetworkManager> network_manager_;
scoped_refptr<PcPacketSocketFactory> socket_factory_;
talk_base::scoped_ptr<cricket::HttpPortAllocator> port_allocator_;
talk_base::scoped_ptr<PeerConnectionSignaling> signaling_;
talk_base::scoped_ptr<WebRtcSession> session_;
talk_base::scoped_ptr<PeerConnectionSignaling> signaling_;
talk_base::scoped_ptr<MediaStreamHandlers> stream_handler_;
};

View File

@ -165,7 +165,7 @@ PeerConnectionManagerImpl::~PeerConnectionManagerImpl() {
bool PeerConnectionManagerImpl::Initialize() {
InitMessageData result(false);
signaling_thread_->Send(this, MSG_INIT_MANAGER, &result);
signaling_thread_ptr_->Send(this, MSG_INIT_MANAGER, &result);
return result.data();
}
@ -214,7 +214,7 @@ scoped_refptr<PeerConnection> PeerConnectionManagerImpl::CreatePeerConnection(
const std::string& configuration,
PeerConnectionObserver* observer) {
CreatePeerConnectionParams params(configuration, observer);
signaling_thread_->Send(this, MSG_CREATE_PEERCONNECTION, &params);
signaling_thread_ptr_->Send(this, MSG_CREATE_PEERCONNECTION, &params);
return params.peerconnection;
}

View File

@ -34,30 +34,29 @@
namespace webrtc {
scoped_refptr<PeerConnectionMessage> PeerConnectionMessage::Create(
PeerConnectionMessage* PeerConnectionMessage::Create(
PeerConnectionMessageType type,
cricket::SessionDescription* desc,
const cricket::SessionDescription* desc,
const std::vector<cricket::Candidate>& candidates) {
return new RefCountImpl<PeerConnectionMessage> (type, desc, candidates);
return new PeerConnectionMessage(type, desc, candidates);
}
scoped_refptr<PeerConnectionMessage> PeerConnectionMessage::Create(
PeerConnectionMessage* PeerConnectionMessage::Create(
const std::string& message) {
scoped_refptr<PeerConnectionMessage>pc_message(new
RefCountImpl<PeerConnectionMessage> ());
PeerConnectionMessage* pc_message(new PeerConnectionMessage());
if (!pc_message->Deserialize(message))
return NULL;
return pc_message;
}
scoped_refptr<PeerConnectionMessage> PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage* PeerConnectionMessage::CreateErrorMessage(
ErrorCode error) {
return new RefCountImpl<PeerConnectionMessage> (error);
return new PeerConnectionMessage(error);
}
PeerConnectionMessage::PeerConnectionMessage(
PeerConnectionMessageType type,
cricket::SessionDescription* desc,
const cricket::SessionDescription* desc,
const std::vector<cricket::Candidate>& candidates)
: type_(type),
error_code_(kNoError),
@ -68,7 +67,7 @@ PeerConnectionMessage::PeerConnectionMessage(
PeerConnectionMessage::PeerConnectionMessage()
: type_(kOffer),
error_code_(kNoError),
desc_(new cricket::SessionDescription()) {
desc_(NULL) {
}
PeerConnectionMessage::PeerConnectionMessage(ErrorCode error)
@ -78,12 +77,19 @@ PeerConnectionMessage::PeerConnectionMessage(ErrorCode error)
}
std::string PeerConnectionMessage::Serialize() {
return JsonSerialize(type_, error_code_, desc_.get(), candidates_);
return JsonSerialize(type_, error_code_, desc_, candidates_);
}
bool PeerConnectionMessage::Deserialize(std::string message) {
return JsonDeserialize(&type_, &error_code_, desc_.get(),
&candidates_, message);
cricket::SessionDescription* desc(new cricket::SessionDescription());
bool result = JsonDeserialize(&type_, &error_code_, desc,
&candidates_, message);
if(!result) {
delete desc;
desc = NULL;
}
desc_ = desc;
return result;
}
} // namespace webrtc

View File

@ -46,8 +46,8 @@ namespace webrtc {
// PeerConnectionMessage represent an SDP offer or an answer.
// Instances of this class can be serialized / deserialized and are used for
// signaling between PeerConnection objects.
// Each instance has a type, a sequence number and a session description.
class PeerConnectionMessage : public RefCount {
// Each instance has a type and a session description.
class PeerConnectionMessage {
public:
enum PeerConnectionMessageType {
kOffer,
@ -63,37 +63,37 @@ class PeerConnectionMessage : public RefCount {
kMessageNotDeliverable = 40 // The signaling channel is broken.
};
static scoped_refptr<PeerConnectionMessage> Create(
static PeerConnectionMessage* Create(
PeerConnectionMessageType type,
cricket::SessionDescription* desc,
const cricket::SessionDescription* desc,
const std::vector<cricket::Candidate>& candidates);
static scoped_refptr<PeerConnectionMessage> Create(
static PeerConnectionMessage* Create(
const std::string& message);
static scoped_refptr<PeerConnectionMessage> CreateErrorMessage(
static PeerConnectionMessage* CreateErrorMessage(
ErrorCode error);
PeerConnectionMessageType type() {return type_;}
ErrorCode error() {return error_code_;}
const cricket::SessionDescription* desc() {return desc_.get();}
const cricket::SessionDescription* desc() {return desc_;}
std::string Serialize();
std::vector<cricket::Candidate>& candidates() { return candidates_; }
protected:
PeerConnectionMessage(PeerConnectionMessageType type,
cricket::SessionDescription* desc,
const cricket::SessionDescription* desc,
const std::vector<cricket::Candidate>& candidates);
PeerConnectionMessage();
explicit PeerConnectionMessage(ErrorCode error);
PeerConnectionMessage();
bool Deserialize(std::string message);
private:
PeerConnectionMessageType type_;
ErrorCode error_code_;
talk_base::scoped_ptr<cricket::SessionDescription> desc_;
const cricket::SessionDescription* desc_; // Weak ref.
std::vector<cricket::Candidate> candidates_;
};

View File

@ -101,27 +101,27 @@ class PeerConnectionMessageTest: public testing::Test {
TEST_F(PeerConnectionMessageTest, Serialize) {
std::string message;
scoped_refptr<PeerConnectionMessage> pc_message;
talk_base::scoped_ptr<PeerConnectionMessage> pc_message;
// Offer
cricket::SessionDescription* offer =
session_description_factory_->CreateOffer(options_);
pc_message = PeerConnectionMessage::Create(PeerConnectionMessage::kOffer,
offer, candidates_);
talk_base::scoped_ptr<const cricket::SessionDescription> offer(
session_description_factory_->CreateOffer(options_));
pc_message.reset(PeerConnectionMessage::Create(PeerConnectionMessage::kOffer,
offer.get(), candidates_));
message = pc_message->Serialize();
LOG(LS_INFO) << message;
// Answer
cricket::SessionDescription* answer =
session_description_factory_->CreateAnswer(offer, options_);
pc_message = PeerConnectionMessage::Create(PeerConnectionMessage::kAnswer,
answer, candidates_);
talk_base::scoped_ptr<const cricket::SessionDescription> answer(
session_description_factory_->CreateAnswer(offer.get(), options_));
pc_message.reset(PeerConnectionMessage::Create(PeerConnectionMessage::kAnswer,
answer.get(), candidates_));
message = pc_message->Serialize();
LOG(LS_INFO) << message;
// Error
pc_message = PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kParseError);
pc_message.reset(PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kParseError));
message = pc_message->Serialize();
LOG(LS_INFO) << message;
@ -131,44 +131,44 @@ TEST_F(PeerConnectionMessageTest, Serialize) {
TEST_F(PeerConnectionMessageTest, Deserialize) {
std::string message_ref;
std::string message_result;
scoped_refptr<PeerConnectionMessage> pc_message;
cricket::SessionDescription* offer =
session_description_factory_->CreateOffer(options_);
cricket::SessionDescription* answer =
session_description_factory_->CreateAnswer(offer, options_);
talk_base::scoped_ptr<PeerConnectionMessage> pc_message;
talk_base::scoped_ptr<cricket::SessionDescription> offer(
session_description_factory_->CreateOffer(options_));
talk_base::scoped_ptr<cricket::SessionDescription> answer(
session_description_factory_->CreateAnswer(offer.get(), options_));
// Offer
pc_message = PeerConnectionMessage::Create(PeerConnectionMessage::kOffer,
offer, candidates_);
pc_message.reset(PeerConnectionMessage::Create(PeerConnectionMessage::kOffer,
offer.get(), candidates_));
message_ref = pc_message->Serialize();
LOG(LS_INFO) << "The reference message: " << message_ref;
// Deserialize Offer
pc_message = PeerConnectionMessage::Create(message_ref);
pc_message.reset(PeerConnectionMessage::Create(message_ref));
message_result = pc_message->Serialize();
LOG(LS_INFO) << "The result message: " << message_result;
EXPECT_EQ(message_ref, message_result);
// Answer
pc_message = PeerConnectionMessage::Create(PeerConnectionMessage::kAnswer,
answer, candidates_);
pc_message.reset(PeerConnectionMessage::Create(PeerConnectionMessage::kAnswer,
answer.get(), candidates_));
message_ref = pc_message->Serialize();
LOG(LS_INFO) << "The reference message: " << message_ref;
// Deserialize Answer
pc_message = PeerConnectionMessage::Create(message_ref);
pc_message.reset(PeerConnectionMessage::Create(message_ref));
message_result = pc_message->Serialize();
LOG(LS_INFO) << "The result message: " << message_result;
EXPECT_EQ(message_ref, message_result);
// Error
pc_message = PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kParseError);
pc_message.reset(PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kParseError));
message_ref = pc_message->Serialize();
LOG(LS_INFO) << "The reference message: " << message_ref;
// Deserialize Error
pc_message = PeerConnectionMessage::Create(message_ref);
pc_message.reset(PeerConnectionMessage::Create(message_ref));
message_result = pc_message->Serialize();
LOG(LS_INFO) << "The result message: " << message_result;
EXPECT_EQ(message_ref, message_result);

View File

@ -32,6 +32,7 @@
#include "talk/app/webrtc_dev/audiotrackimpl.h"
#include "talk/app/webrtc_dev/mediastreamimpl.h"
#include "talk/app/webrtc_dev/videotrackimpl.h"
#include "talk/app/webrtc_dev/sessiondescriptionprovider.h"
#include "talk/base/helpers.h"
#include "talk/base/messagequeue.h"
#include "talk/session/phone/channelmanager.h"
@ -74,18 +75,30 @@ static bool VerifyAnswer(const cricket::SessionDescription* answer_desc) {
}
PeerConnectionSignaling::PeerConnectionSignaling(
cricket::ChannelManager* channel_manager,
talk_base::Thread* signaling_thread)
talk_base::Thread* signaling_thread,
SessionDescriptionProvider* provider)
: signaling_thread_(signaling_thread),
provider_(provider),
state_(kInitializing),
ssrc_counter_(0),
session_description_factory_(channel_manager) {
ssrc_counter_(0) {
}
PeerConnectionSignaling::~PeerConnectionSignaling() {
// Cleanup the queued_received_offer_ if this object is
// deleted before the offer can be processed.
// That means we have parsed an offer and created the remote
// session description but we have not transfered the ownership
// to the provider yet.
if (queued_received_offer_.first) {
const cricket::SessionDescription* remote_desc =
queued_received_offer_.first->desc();
delete remote_desc;
delete queued_received_offer_.first;
}
}
void PeerConnectionSignaling::Initialize(
void PeerConnectionSignaling::OnCandidatesReady(
const cricket::Candidates& candidates) {
ASSERT(state_ == kInitializing);
if (state_ != kInitializing)
@ -110,18 +123,18 @@ void PeerConnectionSignaling::ProcessSignalingMessage(
StreamCollection* local_streams) {
ASSERT(talk_base::Thread::Current() == signaling_thread_);
scoped_refptr<PeerConnectionMessage> signaling_message =
PeerConnectionMessage::Create(message);
talk_base::scoped_ptr<PeerConnectionMessage> signaling_message(
PeerConnectionMessage::Create(message));
if (!signaling_message.get()) {
signaling_message = PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kParseError);
signaling_message.reset(PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kParseError));
SignalNewPeerConnectionMessage(signaling_message->Serialize());
}
switch (signaling_message->type()) {
case PeerConnectionMessage::kOffer: {
queued_received_offer_ =
RemoteOfferPair(signaling_message, local_streams);
RemoteOfferPair(signaling_message.release(), local_streams);
// If we are still Initializing we need to wait before we can handle
// the offer. Queue it and handle it when the state change.
if (state_ == kInitializing) {
@ -135,9 +148,9 @@ void PeerConnectionSignaling::ProcessSignalingMessage(
talk_base::CreateRandomId() % kGlareWaitIntervall;
signaling_thread_->PostDelayed(
timeout, this, MSG_SEND_QUEUED_OFFER, NULL);
scoped_refptr<PeerConnectionMessage> msg =
talk_base::scoped_ptr<PeerConnectionMessage> msg(
PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kWrongState);
PeerConnectionMessage::kWrongState));
SignalNewPeerConnectionMessage(msg->Serialize());
break;
}
@ -153,14 +166,15 @@ void PeerConnectionSignaling::ProcessSignalingMessage(
ASSERT(state_ != PeerConnectionSignaling::kIdle);
if (state_ == PeerConnectionSignaling::kIdle)
return;
// Signal the resulting local and remote session description.
SignalUpdateSessionDescription(last_send_offer_->desc(),
signaling_message->desc(),
signaling_message->candidates());
UpdateRemoteStreams(signaling_message->desc());
const cricket::SessionDescription* remote_desc =
provider_->SetRemoteSessionDescription(
signaling_message->desc(),
signaling_message->candidates());
provider_->NegotiationDone();
UpdateRemoteStreams(remote_desc);
scoped_refptr<StreamCollection> streams(queued_offers_.front());
queued_offers_.pop_front();
UpdateSendingLocalStreams(signaling_message->desc(), streams);
UpdateSendingLocalStreams(remote_desc, streams);
// Check if we have more offers waiting in the queue.
if (queued_offers_.size() > 0) {
// Send the next offer.
@ -212,18 +226,13 @@ void PeerConnectionSignaling::CreateOffer_s() {
cricket::MediaSessionOptions options;
InitMediaSessionOptions(&options, local_streams);
talk_base::scoped_ptr<cricket::SessionDescription> offer(
session_description_factory_.CreateOffer(options));
const cricket::SessionDescription* local_desc =
provider_->ProvideOffer(options);
scoped_refptr<PeerConnectionMessage> offer_message =
talk_base::scoped_ptr<PeerConnectionMessage> offer_message(
PeerConnectionMessage::Create(PeerConnectionMessage::kOffer,
offer.release(),
candidates_);
// If we are updating with new streams we need to get an answer
// before we can handle a remote offer.
// We also need the response before we are allowed to start send media.
last_send_offer_ = offer_message;
local_desc,
candidates_));
SignalNewPeerConnectionMessage(offer_message->Serialize());
}
@ -232,8 +241,9 @@ PeerConnectionSignaling::State PeerConnectionSignaling::GetState() {
}
void PeerConnectionSignaling::CreateAnswer_s() {
scoped_refptr<PeerConnectionMessage> message(
queued_received_offer_.first.release());
talk_base::scoped_ptr<PeerConnectionMessage> message(
queued_received_offer_.first);
queued_received_offer_.first = NULL;
scoped_refptr<StreamCollection> local_streams(
queued_received_offer_.second.release());
@ -241,37 +251,40 @@ void PeerConnectionSignaling::CreateAnswer_s() {
signaling_thread_->Clear(this, MSG_SEND_QUEUED_OFFER, NULL);
queued_offers_.clear();
// Let the provider know about the remote offer.
// The provider takes ownership and return a pointer for us to use.
const cricket::SessionDescription* remote_desc =
provider_->SetRemoteSessionDescription(message->desc(),
message->candidates());
// Create a MediaSessionOptions object with the sources we want to send.
cricket::MediaSessionOptions options;
InitMediaSessionOptions(&options, local_streams);
// Create an local session description based on this.
const cricket::SessionDescription* local_desc =
provider_->ProvideAnswer(options);
// Use the MediaSessionFactory to create an SDP answer.
talk_base::scoped_ptr<cricket::SessionDescription> answer(
session_description_factory_.CreateAnswer(message->desc(), options));
talk_base::scoped_ptr<PeerConnectionMessage> answer_message;
if (!VerifyAnswer(local_desc)) {
answer_message.reset(PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kOfferNotAcceptable));
scoped_refptr<PeerConnectionMessage> answer_message;
if (VerifyAnswer(answer.get())) {
answer_message = PeerConnectionMessage::Create(
PeerConnectionMessage::kAnswer, answer.release(), candidates_);
} else {
answer_message = PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kOfferNotAcceptable);
// Signal that the new answer is ready to be sent.
SignalNewPeerConnectionMessage(answer_message->Serialize());
return;
}
// Signal the resulting local and remote session description.
SignalUpdateSessionDescription(answer.get(),
message->desc(),
message->candidates()); // remote candidates
answer_message.reset(PeerConnectionMessage::Create(
PeerConnectionMessage::kAnswer, local_desc, candidates_));
// Let the provider know the negotiation is done.
provider_->NegotiationDone();
SignalNewPeerConnectionMessage(answer_message->Serialize());
UpdateRemoteStreams(message->desc());
// Signal that the new answer is ready to be sent.
SignalNewPeerConnectionMessage(answer_message->Serialize());
// Start send the local streams.
// TODO(perkj): Defer the start of sending local media so the remote peer
// have time to receive the signaling message before media arrives?
// This is under debate.
// Update the state of the local streams.
UpdateSendingLocalStreams(answer_message->desc(), local_streams);
}

View File

@ -42,6 +42,8 @@
#include "talk/app/webrtc_dev/peerconnectionmessage.h"
#include "talk/app/webrtc_dev/ref_count.h"
#include "talk/app/webrtc_dev/scoped_refptr.h"
#include "talk/app/webrtc_dev/sessiondescriptionprovider.h"
#include "talk/app/webrtc_dev/webrtcsessionobserver.h"
#include "talk/base/basictypes.h"
#include "talk/base/messagehandler.h"
#include "talk/base/scoped_ptr.h"
@ -70,7 +72,8 @@ namespace webrtc {
// Before PeerConnectionSignaling can process an answer or create an offer,
// Initialize have to be called. The last request to create an offer or process
// an answer will be processed after Initialize have been called.
class PeerConnectionSignaling : public talk_base::MessageHandler {
class PeerConnectionSignaling : public WebRtcSessionObserver,
public talk_base::MessageHandler {
public:
enum State {
// Awaiting the local candidates.
@ -85,12 +88,10 @@ class PeerConnectionSignaling : public talk_base::MessageHandler {
kGlare
};
PeerConnectionSignaling(cricket::ChannelManager* channel_manager,
talk_base::Thread* signaling_thread);
PeerConnectionSignaling(talk_base::Thread* signaling_thread,
SessionDescriptionProvider* provider);
~PeerConnectionSignaling();
void Initialize(const cricket::Candidates& candidates);
// Process a received offer/answer from the remote peer.
void ProcessSignalingMessage(const std::string& message,
StreamCollection* local_streams);
@ -118,17 +119,13 @@ class PeerConnectionSignaling : public talk_base::MessageHandler {
// Remote PeerConnection sent an error message.
sigslot::signal1<PeerConnectionMessage::ErrorCode> SignalErrorMessageReceived;
// Informs that a new Offer/Answer have been exchanged.
// The parameters are local session description,
// remote session_description,
// local StreamCollection.
sigslot::signal3<const cricket::SessionDescription*,
const cricket::SessionDescription*,
const cricket::Candidates&> SignalUpdateSessionDescription;
// Implements WebRtcSessionObserver
virtual void OnCandidatesReady(const cricket::Candidates& candidates);
// Implements talk_base::MessageHandler.
virtual void OnMessage(talk_base::Message* msg);
private:
// Implement talk_base::MessageHandler.
virtual void OnMessage(talk_base::Message* msg);
void CreateOffer_s();
void CreateAnswer_s();
@ -143,11 +140,12 @@ class PeerConnectionSignaling : public talk_base::MessageHandler {
typedef std::list<scoped_refptr<StreamCollection> > StreamCollectionList;
StreamCollectionList queued_offers_;
typedef std::pair<scoped_refptr<PeerConnectionMessage>,
typedef std::pair<PeerConnectionMessage*,
scoped_refptr<StreamCollection> > RemoteOfferPair;
RemoteOfferPair queued_received_offer_;
talk_base::Thread* signaling_thread_;
SessionDescriptionProvider* provider_;
State state_;
uint32 ssrc_counter_;
@ -157,9 +155,6 @@ class PeerConnectionSignaling : public talk_base::MessageHandler {
typedef std::map<std::string, scoped_refptr<MediaStream> >
LocalStreamMap;
LocalStreamMap local_streams_;
cricket::MediaSessionDescriptionFactory session_description_factory_;
scoped_refptr<PeerConnectionMessage> last_send_offer_;
cricket::Candidates candidates_;
};

View File

@ -82,8 +82,7 @@ class MockMediaStreamObserver : public webrtc::Observer {
class MockSignalingObserver : public sigslot::has_slots<> {
public:
MockSignalingObserver()
: update_session_description_counter_(0),
remote_peer_(NULL) {
: remote_peer_(NULL) {
}
// New remote stream have been discovered.
@ -108,20 +107,14 @@ class MockSignalingObserver : public sigslot::has_slots<> {
// the message.
talk_base::Thread::Current()->ProcessMessages(1);
}
scoped_refptr<PeerConnectionMessage> message;
message = PeerConnectionMessage::Create(smessage);
talk_base::scoped_ptr<PeerConnectionMessage> message(
PeerConnectionMessage::Create(smessage));
if (message.get() != NULL &&
message->type() != PeerConnectionMessage::kError) {
last_message = smessage;
}
}
void OnUpdateSessionDescription(const cricket::SessionDescription* local,
const cricket::SessionDescription* remote,
const cricket::Candidates& candidates) {
update_session_description_counter_++;
}
// Tell this object to answer the remote_peer.
// remote_local_collection is the local collection the remote peer want to
// send in an answer.
@ -146,40 +139,80 @@ class MockSignalingObserver : public sigslot::has_slots<> {
virtual ~MockSignalingObserver() {}
std::string last_message;
int update_session_description_counter_;
private:
MediaStreamMap remote_media_streams_;
scoped_refptr<StreamCollectionImpl> remote_local_collection_;
PeerConnectionSignaling* remote_peer_;
};
class MockSessionDescriptionProvider : public SessionDescriptionProvider {
public:
MockSessionDescriptionProvider(cricket::ChannelManager* channel_manager)
: update_session_description_counter_(0),
session_description_factory_(
new cricket::MediaSessionDescriptionFactory(channel_manager)) {
}
virtual const cricket::SessionDescription* ProvideOffer(
const cricket::MediaSessionOptions& options) {
offer_.reset(session_description_factory_->CreateOffer(options));
return offer_.get();
}
// Transfer ownership of remote_offer.
virtual const cricket::SessionDescription* SetRemoteSessionDescription(
const cricket::SessionDescription* remote_offer,
const cricket::Candidates& remote_candidates) {
remote_desc_.reset(remote_offer);
return remote_desc_.get();
}
virtual const cricket::SessionDescription* ProvideAnswer(
const cricket::MediaSessionOptions& options) {
answer_.reset(session_description_factory_->CreateAnswer(remote_desc_.get(),
options));
return answer_.get();
}
virtual void NegotiationDone() {
++update_session_description_counter_;
}
int update_session_description_counter_;
protected:
talk_base::scoped_ptr<cricket::MediaSessionDescriptionFactory>
session_description_factory_;
talk_base::scoped_ptr<const cricket::SessionDescription> offer_;
talk_base::scoped_ptr<const cricket::SessionDescription> answer_;
talk_base::scoped_ptr<const cricket::SessionDescription> remote_desc_;
};
class PeerConnectionSignalingTest: public testing::Test {
protected:
virtual void SetUp() {
channel_manager_.reset(new cricket::ChannelManager(
talk_base::Thread::Current()));
EXPECT_TRUE(channel_manager_->Init());
provider1_.reset(new MockSessionDescriptionProvider(
channel_manager_.get()));
provider2_.reset(new MockSessionDescriptionProvider(
channel_manager_.get()));
signaling1_.reset(new PeerConnectionSignaling(
channel_manager_.get(), talk_base::Thread::Current()));
talk_base::Thread::Current(), provider1_.get()));
observer1_.reset(new MockSignalingObserver());
signaling1_->SignalNewPeerConnectionMessage.connect(
observer1_.get(), &MockSignalingObserver::OnSignalingMessage);
signaling1_->SignalUpdateSessionDescription.connect(
observer1_.get(), &MockSignalingObserver::OnUpdateSessionDescription);
signaling1_->SignalRemoteStreamAdded.connect(
observer1_.get(), &MockSignalingObserver::OnRemoteStreamAdded);
signaling1_->SignalRemoteStreamRemoved.connect(
observer1_.get(), &MockSignalingObserver::OnRemoteStreamRemoved);
signaling2_.reset(new PeerConnectionSignaling(
channel_manager_.get(), talk_base::Thread::Current()));
talk_base::Thread::Current(), provider2_.get()));
observer2_.reset(new MockSignalingObserver());
signaling2_->SignalNewPeerConnectionMessage.connect(
observer2_.get(), &MockSignalingObserver::OnSignalingMessage);
signaling2_->SignalUpdateSessionDescription.connect(
observer2_.get(), &MockSignalingObserver::OnUpdateSessionDescription);
signaling2_->SignalRemoteStreamAdded.connect(
observer2_.get(), &MockSignalingObserver::OnRemoteStreamAdded);
signaling2_->SignalRemoteStreamRemoved.connect(
@ -189,6 +222,8 @@ class PeerConnectionSignalingTest: public testing::Test {
cricket::Candidates candidates_;
talk_base::scoped_ptr<MockSignalingObserver> observer1_;
talk_base::scoped_ptr<MockSignalingObserver> observer2_;
talk_base::scoped_ptr<MockSessionDescriptionProvider> provider1_;
talk_base::scoped_ptr<MockSessionDescriptionProvider> provider2_;
talk_base::scoped_ptr<PeerConnectionSignaling> signaling1_;
talk_base::scoped_ptr<PeerConnectionSignaling> signaling2_;
talk_base::scoped_ptr<cricket::ChannelManager> channel_manager_;
@ -233,7 +268,7 @@ TEST_F(PeerConnectionSignalingTest, SimpleOneWayCall) {
EXPECT_EQ(PeerConnectionSignaling::kInitializing, signaling1_->GetState());
// Initialize signaling1_ by providing the candidates.
signaling1_->Initialize(candidates_);
signaling1_->OnCandidatesReady(candidates_);
EXPECT_EQ(PeerConnectionSignaling::kWaitingForAnswer,
signaling1_->GetState());
// Process posted messages to allow signaling_1 to send the offer.
@ -244,7 +279,7 @@ TEST_F(PeerConnectionSignalingTest, SimpleOneWayCall) {
EXPECT_EQ(PeerConnectionSignaling::kInitializing, signaling2_->GetState());
// Provide the candidates to signaling_2 and let it process the offer.
signaling2_->Initialize(candidates_);
signaling2_->OnCandidatesReady(candidates_);
talk_base::Thread::Current()->ProcessMessages(1);
// Verify that the offer/answer have been exchanged and the state is good.
@ -260,14 +295,14 @@ TEST_F(PeerConnectionSignalingTest, SimpleOneWayCall) {
EXPECT_TRUE(observer2_->RemoteStream(label) != NULL);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(1u, observer1_->update_session_description_counter_);
EXPECT_EQ(1u, observer2_->update_session_description_counter_);
EXPECT_EQ(1u, provider1_->update_session_description_counter_);
EXPECT_EQ(1u, provider2_->update_session_description_counter_);
}
TEST_F(PeerConnectionSignalingTest, Glare) {
// Initialize signaling1_ and signaling_2 by providing the candidates.
signaling1_->Initialize(candidates_);
signaling2_->Initialize(candidates_);
signaling1_->OnCandidatesReady(candidates_);
signaling2_->OnCandidatesReady(candidates_);
// Create a local stream.
std::string label(kStreamLabel1);
scoped_refptr<LocalMediaStream> stream(CreateLocalMediaStream(label));
@ -323,14 +358,14 @@ TEST_F(PeerConnectionSignalingTest, Glare) {
EXPECT_TRUE(observer2_->RemoteStream(label) != NULL);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(1u, observer1_->update_session_description_counter_);
EXPECT_EQ(1u, observer2_->update_session_description_counter_);
EXPECT_EQ(1u, provider1_->update_session_description_counter_);
EXPECT_EQ(1u, provider2_->update_session_description_counter_);
}
TEST_F(PeerConnectionSignalingTest, AddRemoveStream) {
// Initialize signaling1_ and signaling_2 by providing the candidates.
signaling1_->Initialize(candidates_);
signaling2_->Initialize(candidates_);
signaling1_->OnCandidatesReady(candidates_);
signaling2_->OnCandidatesReady(candidates_);
// Create a local stream.
std::string label(kStreamLabel1);
scoped_refptr<LocalMediaStream> stream(CreateLocalMediaStream(label));
@ -367,8 +402,8 @@ TEST_F(PeerConnectionSignalingTest, AddRemoveStream) {
talk_base::Thread::Current()->ProcessMessages(1);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(1u, observer1_->update_session_description_counter_);
EXPECT_EQ(1u, observer2_->update_session_description_counter_);
EXPECT_EQ(1u, provider1_->update_session_description_counter_);
EXPECT_EQ(1u, provider2_->update_session_description_counter_);
// Peer2 add a stream.
local_collection2->AddStream(stream);
@ -384,8 +419,8 @@ TEST_F(PeerConnectionSignalingTest, AddRemoveStream) {
EXPECT_TRUE(observer1_->RemoteStream(label) != NULL);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(2u, observer1_->update_session_description_counter_);
EXPECT_EQ(2u, observer2_->update_session_description_counter_);
EXPECT_EQ(2u, provider1_->update_session_description_counter_);
EXPECT_EQ(2u, provider2_->update_session_description_counter_);
// Remove the stream
local_collection2->RemoveStream(stream);
@ -401,8 +436,8 @@ TEST_F(PeerConnectionSignalingTest, AddRemoveStream) {
EXPECT_EQ(MediaStreamTrack::kEnded, track_observer1.track_state);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(3u, observer1_->update_session_description_counter_);
EXPECT_EQ(3u, observer2_->update_session_description_counter_);
EXPECT_EQ(3u, provider1_->update_session_description_counter_);
EXPECT_EQ(3u, provider2_->update_session_description_counter_);
}
} // namespace webrtc

View File

@ -0,0 +1,58 @@
/*
* libjingle
* Copyright 2011, Google Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TALK_APP_WEBRTC_SESSIONDESCRIPTIONPROVIDER_H_
#define TALK_APP_WEBRTC_SESSIONDESCRIPTIONPROVIDER_H_
#include "talk/session/phone/mediasession.h"
#include "talk/p2p/base/candidate.h"
#include "talk/p2p/base/sessiondescription.h"
namespace webrtc {
class SessionDescriptionProvider {
public:
virtual const cricket::SessionDescription* ProvideOffer(
const cricket::MediaSessionOptions& options) = 0;
// Transfer ownership of remote_offer.
virtual const cricket::SessionDescription* SetRemoteSessionDescription(
const cricket::SessionDescription* remote_offer,
const std::vector<cricket::Candidate>& remote_candidates) = 0;
virtual const cricket::SessionDescription* ProvideAnswer(
const cricket::MediaSessionOptions& options) = 0;
virtual void NegotiationDone() = 0;
protected:
virtual ~SessionDescriptionProvider() {}
};
} // namespace webrtc
#endif // TALK_APP_WEBRTC_SESSIONDESCRIPTIONPROVIDER_H_

View File

@ -58,15 +58,14 @@ static const std::string kRtcpVideoChannelStr = "video_rtcp";
WebRtcSession::WebRtcSession(cricket::ChannelManager* channel_manager,
talk_base::Thread* signaling_thread,
talk_base::Thread* worker_thread,
cricket::PortAllocator* port_allocator,
PeerConnectionSignaling* pc_signaling)
: cricket::BaseSession(signaling_thread, worker_thread, port_allocator,
cricket::PortAllocator* port_allocator)
: observer_(NULL),
cricket::BaseSession(signaling_thread, worker_thread, port_allocator,
talk_base::ToString(talk_base::CreateRandomId()),
cricket::NS_JINGLE_RTP, true) {
// TODO(mallinath) - Remove initiator flag from BaseSession. As it's being
// used only by cricket::Session.
channel_manager_ = channel_manager;
pc_signaling_ = pc_signaling;
}
WebRtcSession::~WebRtcSession() {
@ -74,9 +73,6 @@ WebRtcSession::~WebRtcSession() {
}
bool WebRtcSession::Initialize() {
ASSERT(pc_signaling_ != NULL);
pc_signaling_->SignalUpdateSessionDescription.connect(
this, &WebRtcSession::OnSignalUpdateSessionDescription);
return CreateChannels();
}
@ -217,7 +213,8 @@ void WebRtcSession::OnTransportCandidatesReady(
return;
InsertTransportCandidates(candidates);
if (local_candidates_.size() == kAllowedCandidates) {
pc_signaling_->Initialize(local_candidates_);
if(observer_)
observer_->OnCandidatesReady(local_candidates_);
// TODO(mallinath) - Remove signal when a new interface added for
// PC signaling.
SignalCandidatesReady(this, local_candidates_);

View File

@ -36,6 +36,8 @@
#include "talk/p2p/base/session.h"
#include "talk/session/phone/mediasession.h"
#include "talk/app/webrtc_dev/mediastreamprovider.h"
#include "talk/app/webrtc_dev/sessiondescriptionprovider.h"
#include "talk/app/webrtc_dev/webrtcsessionobserver.h"
namespace cricket {
class ChannelManager;
@ -51,13 +53,13 @@ class PeerConnectionSignaling;
class StreamCollection;
class WebRtcSession : public cricket::BaseSession,
public MediaProviderInterface {
public MediaProviderInterface,
public SessionDescriptionProvider {
public:
WebRtcSession(cricket::ChannelManager* channel_manager,
talk_base::Thread* signaling_thread,
talk_base::Thread* worker_thread,
cricket::PortAllocator* port_allocator,
PeerConnectionSignaling* pc_signaling);
cricket::PortAllocator* port_allocator);
~WebRtcSession();
bool Initialize();
@ -88,12 +90,23 @@ class WebRtcSession : public cricket::BaseSession,
virtual void SetRemoteRenderer(uint32 ssrc,
cricket::VideoRenderer* renderer);
// Callback handling from PeerConnectionSignaling
//TODO mallinath: remove.
void OnSignalUpdateSessionDescription(
const cricket::SessionDescription* local_desc,
const cricket::SessionDescription* remote_desc,
const cricket::Candidates& remote_candidates);
// Implements SessionDescriptionProvider
virtual const cricket::SessionDescription* ProvideOffer(
const cricket::MediaSessionOptions& options) {}
virtual const cricket::SessionDescription* SetRemoteSessionDescription(
const cricket::SessionDescription* remote_offer,
const cricket::Candidates& remote_candidates) {}
virtual const cricket::SessionDescription* ProvideAnswer(
const cricket::MediaSessionOptions& options) {}
virtual void NegotiationDone() {}
// Transport related callbacks, override from cricket::BaseSession.
virtual void OnTransportRequestSignaling(cricket::Transport* transport);
virtual void OnTransportConnecting(cricket::Transport* transport);
@ -122,7 +135,7 @@ class WebRtcSession : public cricket::BaseSession,
void ProcessRemoteMediaChanges(const cricket::SessionDescription* sdesc);
private:
PeerConnectionSignaling* pc_signaling_;
WebRtcSessionObserver* observer_;
talk_base::scoped_ptr<cricket::VoiceChannel> voice_channel_;
talk_base::scoped_ptr<cricket::VideoChannel> video_channel_;
cricket::ChannelManager* channel_manager_;

View File

@ -48,9 +48,6 @@ class WebRtcSessionTest : public testing::Test,
channel_manager_.reset(new cricket::ChannelManager(worker_thread_));
port_allocator_.reset(
new cricket::FakePortAllocator(worker_thread_, NULL));
pc_signaling_.reset(
new webrtc::PeerConnectionSignaling(channel_manager_.get(),
signaling_thread_));
media_factory_ =
new cricket::MediaSessionDescriptionFactory(channel_manager_.get());
}
@ -77,7 +74,7 @@ class WebRtcSessionTest : public testing::Test,
EXPECT_TRUE(channel_manager_.get()->Init());
session_.reset(new webrtc::WebRtcSession(
channel_manager_.get(), worker_thread_, signaling_thread_,
port_allocator_.get(), pc_signaling_.get()));
port_allocator_.get()));
session_->SignalCandidatesReady.connect(
this, &WebRtcSessionTest::OnCandidatesReady);
EXPECT_TRUE(InitializeSession());
@ -113,7 +110,6 @@ class WebRtcSessionTest : public testing::Test,
talk_base::Thread* signaling_thread_;
talk_base::Thread* worker_thread_;
talk_base::scoped_ptr<cricket::PortAllocator> port_allocator_;
talk_base::scoped_ptr<webrtc::PeerConnectionSignaling> pc_signaling_;
talk_base::scoped_ptr<webrtc::WebRtcSession> session_;
talk_base::scoped_ptr<cricket::ChannelManager> channel_manager_;
};

View File

@ -0,0 +1,47 @@
/*
* libjingle
* Copyright 2011, Google Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TALK_APP_WEBRTC_WEBRTCSESSIONOBSERVER_H_
#define TALK_APP_WEBRTC_WEBRTCSESSIONOBSERVER_H_
#include <vector>
#include "talk/p2p/base/candidate.h"
namespace webrtc {
class WebRtcSessionObserver {
public:
virtual void OnCandidatesReady(
const std::vector<cricket::Candidate>& candiddates) = 0;
protected:
virtual ~WebRtcSessionObserver() {}
};
} // namespace webrtc
#endif // TALK_APP_WEBRTC_WEBRTCSESSIONOBSERVER_H_