Add SignalUpdateSessionDescription to PeerConnectionSignaling.

This is to allow webrtcsession to setup the mediachannels based on tracks.

BUG=
TEST=

Review URL: http://webrtc-codereview.appspot.com/184001

git-svn-id: http://webrtc.googlecode.com/svn/trunk@665 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
perkj@webrtc.org 2011-09-28 23:06:46 +00:00
parent 6b6d08164f
commit 5045f671d0
3 changed files with 187 additions and 40 deletions

View File

@ -39,7 +39,8 @@
namespace webrtc {
enum {
MSG_SEND_QUEUED_OFFER = 301,
MSG_SEND_QUEUED_OFFER = 1,
MSG_GENERATE_ANSWER = 2,
};
static const int kGlareMinWaitTime = 2 * 1000; // 2 sec
static const int kGlareWaitIntervall = 1 * 1000; // 1 sec
@ -75,8 +76,9 @@ static bool VerifyAnswer(const cricket::SessionDescription* answer_desc) {
scoped_refptr<PeerConnectionMessage> PeerConnectionMessage::Create(
PeerConnectionMessageType type,
const cricket::SessionDescription* desc) {
return new RefCountImpl<PeerConnectionMessage> (type, desc);
const cricket::SessionDescription* desc,
const cricket::Candidates& candidates) {
return new RefCountImpl<PeerConnectionMessage> (type, desc, candidates);
}
scoped_refptr<PeerConnectionMessage> PeerConnectionMessage::CreateErrorMessage(
@ -86,22 +88,25 @@ scoped_refptr<PeerConnectionMessage> PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::PeerConnectionMessage(
PeerConnectionMessageType type,
const cricket::SessionDescription* desc)
const cricket::SessionDescription* desc,
const cricket::Candidates& candidates)
: type_(type),
error_code_(kNoError),
desc_(desc),
error_code_(kNoError) {
candidates_(candidates) {
}
PeerConnectionMessage::PeerConnectionMessage(ErrorCode error)
: type_(kError),
desc_(NULL),
error_code_(error) {
error_code_(error),
desc_(NULL) {
}
PeerConnectionSignaling::PeerConnectionSignaling(
cricket::ChannelManager* channel_manager)
: signaling_thread_(talk_base::Thread::Current()),
state_(kIdle),
cricket::ChannelManager* channel_manager,
talk_base::Thread* signaling_thread)
: signaling_thread_(signaling_thread),
state_(kInitializing),
ssrc_counter_(0),
session_description_factory_(channel_manager) {
}
@ -109,11 +114,39 @@ PeerConnectionSignaling::PeerConnectionSignaling(
PeerConnectionSignaling::~PeerConnectionSignaling() {
}
void PeerConnectionSignaling::Initialize(
const cricket::Candidates& candidates) {
ASSERT(state_ == kInitializing);
if (state_ != kInitializing)
return;
// Store the candidates.
candidates_ = candidates;
// If we have a queued remote offer we need to handle this first.
if (queued_received_offer_.first != NULL) {
state_ = kIdle;
signaling_thread_->Post(this, MSG_GENERATE_ANSWER);
} else if (queued_offers_.size() >0) {
// Else if we have local queued offers.
state_ = PeerConnectionSignaling::kWaitingForAnswer;
signaling_thread_->Post(this, MSG_SEND_QUEUED_OFFER);
} else {
state_ = kIdle;
}
}
void PeerConnectionSignaling::ProcessSignalingMessage(
PeerConnectionMessage* message,
StreamCollection* local_streams) {
ASSERT(talk_base::Thread::Current() == signaling_thread_);
switch (message->type()) {
case PeerConnectionMessage::kOffer: {
queued_received_offer_ = RemoteOfferPair(message, local_streams);
// If we are still Initializing we need to wait before we can handle
// the offer. Queue it and handle it when the state change.
if (state_ == kInitializing) {
break;
}
// Don't handle offers when we are waiting for an answer.
if (state_ == kWaitingForAnswer) {
state_ = kGlare;
@ -131,11 +164,7 @@ void PeerConnectionSignaling::ProcessSignalingMessage(
if (state_ == kGlare) {
state_ = kIdle;
}
// Reset all pending offers. Instead, send the new streams in the answer.
signaling_thread_->Clear(this, MSG_SEND_QUEUED_OFFER, NULL);
queued_offers_.clear();
GenerateAnswer(message, local_streams);
UpdateRemoteStreams(message->desc());
signaling_thread_->Post(this, MSG_GENERATE_ANSWER);
break;
}
case PeerConnectionMessage::kAnswer: {
@ -147,11 +176,16 @@ void PeerConnectionSignaling::ProcessSignalingMessage(
queued_offers_.pop_front();
UpdateSendingLocalStreams(message->desc(), streams);
// Check if we have more offers waiting in the queue.
if (queued_offers_.size() > 0)
if (queued_offers_.size() > 0) {
// Send the next offer.
signaling_thread_->Post(this, MSG_SEND_QUEUED_OFFER);
else
} else {
state_ = PeerConnectionSignaling::kIdle;
}
// Signal the resulting local and remote session description.
SignalUpdateSessionDescription(last_send_offer_->desc(),
message->desc(),
streams.get());
break;
}
case PeerConnectionMessage::kError: {
@ -168,6 +202,7 @@ void PeerConnectionSignaling::ProcessSignalingMessage(
}
void PeerConnectionSignaling::CreateOffer(StreamCollection* local_streams) {
ASSERT(talk_base::Thread::Current() == signaling_thread_);
queued_offers_.push_back(local_streams);
if (state_ == kIdle) {
// Check if we can sent a new offer.
@ -177,6 +212,18 @@ void PeerConnectionSignaling::CreateOffer(StreamCollection* local_streams) {
}
}
// Implement talk_base::MessageHandler.
void PeerConnectionSignaling::OnMessage(talk_base::Message* msg) {
switch (msg->message_id) {
case MSG_SEND_QUEUED_OFFER:
CreateOffer_s();
break;
case MSG_GENERATE_ANSWER:
CreateAnswer_s();
break;
}
}
void PeerConnectionSignaling::CreateOffer_s() {
ASSERT(queued_offers_.size() > 0);
scoped_refptr<StreamCollection> local_streams(queued_offers_.front());
@ -189,11 +236,13 @@ void PeerConnectionSignaling::CreateOffer_s() {
scoped_refptr<PeerConnectionMessage> offer_message =
PeerConnectionMessage::Create(PeerConnectionMessage::kOffer,
offer.release());
offer.release(),
candidates_);
// If we are updating with new streams we need to get an answer
// before we can handle a remote offer.
// We also need the response before we are allowed to start send media.
last_send_offer_ = offer_message;
SignalNewPeerConnectionMessage(offer_message);
}
@ -201,17 +250,16 @@ PeerConnectionSignaling::State PeerConnectionSignaling::GetState() {
return state_;
}
// Implement talk_base::MessageHandler.
void PeerConnectionSignaling::OnMessage(talk_base::Message* msg) {
switch (msg->message_id) {
case MSG_SEND_QUEUED_OFFER:
CreateOffer_s();
break;
}
}
void PeerConnectionSignaling::CreateAnswer_s() {
scoped_refptr<PeerConnectionMessage> message(
queued_received_offer_.first.release());
scoped_refptr<StreamCollection> local_streams(
queued_received_offer_.second.release());
// Reset all pending offers. Instead, send the new streams in the answer.
signaling_thread_->Clear(this, MSG_SEND_QUEUED_OFFER, NULL);
queued_offers_.clear();
void PeerConnectionSignaling::GenerateAnswer(PeerConnectionMessage* message,
StreamCollection* local_streams) {
// Create a MediaSessionOptions object with the sources we want to send.
cricket::MediaSessionOptions options;
options.is_video = true;
@ -224,13 +272,15 @@ void PeerConnectionSignaling::GenerateAnswer(PeerConnectionMessage* message,
scoped_refptr<PeerConnectionMessage> answer_message;
if (VerifyAnswer(answer.get())) {
answer_message = PeerConnectionMessage::Create(
PeerConnectionMessage::kAnswer, answer.release());
PeerConnectionMessage::kAnswer, answer.release(), candidates_);
} else {
answer_message = PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kOfferNotAcceptable);
}
UpdateRemoteStreams(message->desc());
// Signal that the new answer is ready to be sent.
SignalNewPeerConnectionMessage(answer_message);
@ -239,6 +289,11 @@ void PeerConnectionSignaling::GenerateAnswer(PeerConnectionMessage* message,
// have time to receive the signaling message before media arrives?
// This is under debate.
UpdateSendingLocalStreams(answer_message->desc(), local_streams);
// Signal the resulting local and remote session description.
SignalUpdateSessionDescription(answer.get(),
message->desc(),
local_streams);
}
// Fills a MediaSessionOptions struct with the MediaTracks we want to sent given

View File

@ -34,6 +34,8 @@
#include <list>
#include <map>
#include <string>
#include <utility>
#include <vector>
#include "talk/app/webrtc_dev/mediastreamimpl.h"
#include "talk/app/webrtc_dev/peerconnection.h"
@ -48,6 +50,8 @@
namespace cricket {
class ChannelManager;
class Candidate;
typedef std::vector<Candidate> Candidates;
}
namespace webrtc {
@ -74,7 +78,8 @@ class PeerConnectionMessage : public RefCount {
static scoped_refptr<PeerConnectionMessage> Create(
PeerConnectionMessageType type,
const cricket::SessionDescription* desc);
const cricket::SessionDescription* desc,
const cricket::Candidates& candidates);
static scoped_refptr<PeerConnectionMessage> CreateErrorMessage(
ErrorCode error);
@ -82,18 +87,21 @@ class PeerConnectionMessage : public RefCount {
PeerConnectionMessageType type() {return type_;}
ErrorCode error() {return error_code_;}
const cricket::SessionDescription* desc() {return desc_.get();}
const cricket::Candidates& candidates() {return candidates_;}
// TODO(perkj): Add functions for serializing and deserializing this class.
protected:
PeerConnectionMessage(PeerConnectionMessageType type,
const cricket::SessionDescription* desc);
const cricket::SessionDescription* desc,
const cricket::Candidates& candidates);
explicit PeerConnectionMessage(ErrorCode error);
private:
PeerConnectionMessageType type_;
ErrorCode error_code_;
talk_base::scoped_ptr<const cricket::SessionDescription> desc_;
cricket::Candidates candidates_;
};
// PeerConnectionSignaling is a class responsible for handling signaling
@ -102,12 +110,18 @@ class PeerConnectionMessage : public RefCount {
// to send a new MediaStream.
// It changes the state of local MediaStreams and tracks
// when a remote peer is ready to receive media.
// Call Initialize when local Candidates are ready.
// Call CreateOffer to negotiate new local streams to send.
// Call ProcessSignalingMessage when a new PeerConnectionMessage have been
// received from the remote peer.
// Before PeerConnectionSignaling can process an answer or create an offer,
// Initialize have to be called. The last request to create an offer or process
// an answer will be processed after Initialize have been called.
class PeerConnectionSignaling : public talk_base::MessageHandler {
public:
enum State {
// Awaiting the local candidates.
kInitializing,
// Ready to sent new offer or receive a new offer.
kIdle,
// We have sent an offer and expect an answer, or we want to update
@ -118,9 +132,12 @@ class PeerConnectionSignaling : public talk_base::MessageHandler {
kGlare
};
explicit PeerConnectionSignaling(cricket::ChannelManager* channel_manager);
PeerConnectionSignaling(cricket::ChannelManager* channel_manager,
talk_base::Thread* signaling_thread);
~PeerConnectionSignaling();
void Initialize(const cricket::Candidates& candidates);
// Process a received offer/answer from the remote peer.
void ProcessSignalingMessage(PeerConnectionMessage* message,
StreamCollection* local_streams);
@ -148,15 +165,22 @@ class PeerConnectionSignaling : public talk_base::MessageHandler {
// Remote PeerConnection sent an error message.
sigslot::signal1<PeerConnectionMessage::ErrorCode> SignalErrorMessageReceived;
// Informs that a new Offer/Answer have been exchanged.
// The parameters are local session description,
// remote session_description,
// local StreamCollection.
sigslot::signal3<const cricket::SessionDescription*,
const cricket::SessionDescription*,
StreamCollection*> SignalUpdateSessionDescription;
private:
// Implement talk_base::MessageHandler.
virtual void OnMessage(talk_base::Message* msg);
void CreateOffer_s();
void GenerateAnswer(PeerConnectionMessage* message,
StreamCollection* local_streams);
void CreateAnswer_s();
void InitMediaSessionOptions(cricket::MediaSessionOptions* options,
StreamCollection* local_streams);
StreamCollection* local_streams);
void UpdateRemoteStreams(const cricket::SessionDescription* remote_desc);
void UpdateSendingLocalStreams(
@ -166,6 +190,10 @@ class PeerConnectionSignaling : public talk_base::MessageHandler {
typedef std::list<scoped_refptr<StreamCollection> > StreamCollectionList;
StreamCollectionList queued_offers_;
typedef std::pair<scoped_refptr<PeerConnectionMessage>,
scoped_refptr<StreamCollection> > RemoteOfferPair;
RemoteOfferPair queued_received_offer_;
talk_base::Thread* signaling_thread_;
State state_;
uint32 ssrc_counter_;
@ -177,6 +205,9 @@ class PeerConnectionSignaling : public talk_base::MessageHandler {
LocalStreamMap;
LocalStreamMap local_streams_;
cricket::MediaSessionDescriptionFactory session_description_factory_;
scoped_refptr<PeerConnectionMessage> last_send_offer_;
cricket::Candidates candidates_;
};
} // namespace webrtc

View File

@ -82,7 +82,8 @@ class MockMediaStreamObserver : public webrtc::Observer {
class MockSignalingObserver : public sigslot::has_slots<> {
public:
MockSignalingObserver()
: remote_peer_(NULL) {
: update_session_description_counter_(0),
remote_peer_(NULL) {
}
// New remote stream have been discovered.
@ -103,12 +104,21 @@ class MockSignalingObserver : public sigslot::has_slots<> {
void OnSignalingMessage(PeerConnectionMessage* message) {
if (remote_peer_) {
remote_peer_->ProcessSignalingMessage(message, remote_local_collection_);
// Process posted messages to allow the remote peer to process
// the message.
talk_base::Thread::Current()->ProcessMessages(1);
}
if (message->type() != PeerConnectionMessage::kError) {
last_message = message;
}
}
void OnUpdateSessionDescription(const cricket::SessionDescription* local,
const cricket::SessionDescription* remote,
StreamCollection* local_streams) {
update_session_description_counter_++;
}
// Tell this object to answer the remote_peer.
// remote_local_collection is the local collection the remote peer want to
// send in an answer.
@ -133,6 +143,7 @@ class MockSignalingObserver : public sigslot::has_slots<> {
virtual ~MockSignalingObserver() {}
scoped_refptr<PeerConnectionMessage> last_message;
int update_session_description_counter_;
private:
MediaStreamMap remote_media_streams_;
@ -147,25 +158,32 @@ class PeerConnectionSignalingTest: public testing::Test {
talk_base::Thread::Current()));
EXPECT_TRUE(channel_manager_->Init());
signaling1_.reset(new PeerConnectionSignaling(channel_manager_.get()));
signaling1_.reset(new PeerConnectionSignaling(
channel_manager_.get(), talk_base::Thread::Current()));
observer1_.reset(new MockSignalingObserver());
signaling1_->SignalNewPeerConnectionMessage.connect(
observer1_.get(), &MockSignalingObserver::OnSignalingMessage);
signaling1_->SignalUpdateSessionDescription.connect(
observer1_.get(), &MockSignalingObserver::OnUpdateSessionDescription);
signaling1_->SignalRemoteStreamAdded.connect(
observer1_.get(), &MockSignalingObserver::OnRemoteStreamAdded);
signaling1_->SignalRemoteStreamRemoved.connect(
observer1_.get(), &MockSignalingObserver::OnRemoteStreamRemoved);
signaling2_.reset(new PeerConnectionSignaling(channel_manager_.get()));
signaling2_.reset(new PeerConnectionSignaling(
channel_manager_.get(), talk_base::Thread::Current()));
observer2_.reset(new MockSignalingObserver());
signaling2_->SignalNewPeerConnectionMessage.connect(
observer2_.get(), &MockSignalingObserver::OnSignalingMessage);
signaling2_->SignalUpdateSessionDescription.connect(
observer2_.get(), &MockSignalingObserver::OnUpdateSessionDescription);
signaling2_->SignalRemoteStreamAdded.connect(
observer2_.get(), &MockSignalingObserver::OnRemoteStreamAdded);
signaling2_->SignalRemoteStreamRemoved.connect(
observer2_.get(), &MockSignalingObserver::OnRemoteStreamRemoved);
}
cricket::Candidates candidates_;
talk_base::scoped_ptr<MockSignalingObserver> observer1_;
talk_base::scoped_ptr<MockSignalingObserver> observer2_;
talk_base::scoped_ptr<PeerConnectionSignaling> signaling1_;
@ -203,12 +221,30 @@ TEST_F(PeerConnectionSignalingTest, SimpleOneWayCall) {
// Connect all messages sent from Peer2 to be received on Peer1
observer2_->AnswerPeer(signaling1_.get(), local_collection1);
// Peer 1 generates the offer and and send it to Peer2.
// Peer 1 generates the offer. It is not sent since there is no
// local candidates ready.
signaling1_->CreateOffer(local_collection1);
// Process posted messages.
talk_base::Thread::Current()->ProcessMessages(1);
EXPECT_EQ(PeerConnectionSignaling::kInitializing, signaling1_->GetState());
// Initialize signaling1_ by providing the candidates.
signaling1_->Initialize(candidates_);
EXPECT_EQ(PeerConnectionSignaling::kWaitingForAnswer,
signaling1_->GetState());
// Process posted messages to allow signaling_1 to send the offer.
talk_base::Thread::Current()->ProcessMessages(1);
// Verify that signaling_2 is still not initialized.
// Even though it have received an offer.
EXPECT_EQ(PeerConnectionSignaling::kInitializing, signaling2_->GetState());
// Provide the candidates to signaling_2 and let it process the offer.
signaling2_->Initialize(candidates_);
talk_base::Thread::Current()->ProcessMessages(1);
// Verify that the offer/answer have been exchanged and the state is good.
EXPECT_EQ(PeerConnectionSignaling::kIdle, signaling1_->GetState());
EXPECT_EQ(PeerConnectionSignaling::kIdle, signaling2_->GetState());
@ -219,9 +255,16 @@ TEST_F(PeerConnectionSignalingTest, SimpleOneWayCall) {
// Verify that PeerConnection2 is aware of the sending stream.
EXPECT_TRUE(observer2_->RemoteStream(label) != NULL);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(1u, observer1_->update_session_description_counter_);
EXPECT_EQ(1u, observer2_->update_session_description_counter_);
}
TEST_F(PeerConnectionSignalingTest, Glare) {
// Initialize signaling1_ and signaling_2 by providing the candidates.
signaling1_->Initialize(candidates_);
signaling2_->Initialize(candidates_);
// Create a local stream.
std::string label(kStreamLabel1);
scoped_refptr<LocalMediaStream> stream(CreateLocalMediaStream(label));
@ -275,9 +318,16 @@ TEST_F(PeerConnectionSignalingTest, Glare) {
// Verify that PeerConnection2 is aware of the sending stream.
EXPECT_TRUE(observer2_->RemoteStream(label) != NULL);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(1u, observer1_->update_session_description_counter_);
EXPECT_EQ(1u, observer2_->update_session_description_counter_);
}
TEST_F(PeerConnectionSignalingTest, AddRemoveStream) {
// Initialize signaling1_ and signaling_2 by providing the candidates.
signaling1_->Initialize(candidates_);
signaling2_->Initialize(candidates_);
// Create a local stream.
std::string label(kStreamLabel1);
scoped_refptr<LocalMediaStream> stream(CreateLocalMediaStream(label));
@ -310,10 +360,13 @@ TEST_F(PeerConnectionSignalingTest, AddRemoveStream) {
// Peer 1 creates an empty offer and send it to Peer2.
signaling1_->CreateOffer(local_collection1);
// Process posted messages.
talk_base::Thread::Current()->ProcessMessages(1);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(1u, observer1_->update_session_description_counter_);
EXPECT_EQ(1u, observer2_->update_session_description_counter_);
// Peer2 add a stream.
local_collection2->AddStream(stream);
@ -327,6 +380,10 @@ TEST_F(PeerConnectionSignalingTest, AddRemoveStream) {
// Verify that PeerConnection1 is aware of the sending stream.
EXPECT_TRUE(observer1_->RemoteStream(label) != NULL);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(2u, observer1_->update_session_description_counter_);
EXPECT_EQ(2u, observer2_->update_session_description_counter_);
// Remove the stream
local_collection2->RemoveStream(stream);
@ -339,6 +396,10 @@ TEST_F(PeerConnectionSignalingTest, AddRemoveStream) {
// Verify that the PeerConnection 2 local stream is now ended.
EXPECT_EQ(MediaStream::kEnded, stream_observer1.ready_state);
EXPECT_EQ(MediaStreamTrack::kEnded, track_observer1.track_state);
// Verify that both peers have updated the session descriptions.
EXPECT_EQ(3u, observer1_->update_session_description_counter_);
EXPECT_EQ(3u, observer2_->update_session_description_counter_);
}
} // namespace webrtc