more webrtc session changes. Transport and TransportChannel handling is complete. Need work on session state.

Review URL: http://webrtc-codereview.appspot.com/183005

git-svn-id: http://webrtc.googlecode.com/svn/trunk@679 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
mallinath@webrtc.org 2011-10-03 20:33:06 +00:00
parent f1f3fb33b5
commit ee2c391c15
8 changed files with 317 additions and 28 deletions

View File

@ -79,6 +79,7 @@ class PeerConnectionMessage : public RefCount {
const cricket::SessionDescription* desc() {return desc_.get();}
bool Serialize(std::string* message);
std::vector<cricket::Candidate>& candidates() { return candidates_; }
protected:
PeerConnectionMessage(PeerConnectionMessageType type,

View File

@ -142,6 +142,11 @@ void PeerConnectionSignaling::ProcessSignalingMessage(
ASSERT(state_ != PeerConnectionSignaling::kIdle);
if (state_ == PeerConnectionSignaling::kIdle)
return;
// Signal the resulting local and remote session description.
SignalUpdateSessionDescription(last_send_offer_->desc(),
message->desc(),
message->candidates());
UpdateRemoteStreams(message->desc());
scoped_refptr<StreamCollection> streams(queued_offers_.front());
queued_offers_.pop_front();
@ -153,10 +158,6 @@ void PeerConnectionSignaling::ProcessSignalingMessage(
} else {
state_ = PeerConnectionSignaling::kIdle;
}
// Signal the resulting local and remote session description.
SignalUpdateSessionDescription(last_send_offer_->desc(),
message->desc(),
streams.get());
break;
}
case PeerConnectionMessage::kError: {
@ -249,6 +250,10 @@ void PeerConnectionSignaling::CreateAnswer_s() {
answer_message = PeerConnectionMessage::CreateErrorMessage(
PeerConnectionMessage::kOfferNotAcceptable);
}
// Signal the resulting local and remote session description.
SignalUpdateSessionDescription(answer.get(),
message->desc(),
message->candidates()); // remote candidates
UpdateRemoteStreams(message->desc());
@ -260,11 +265,6 @@ void PeerConnectionSignaling::CreateAnswer_s() {
// have time to receive the signaling message before media arrives?
// This is under debate.
UpdateSendingLocalStreams(answer_message->desc(), local_streams);
// Signal the resulting local and remote session description.
SignalUpdateSessionDescription(answer.get(),
message->desc(),
local_streams);
}
// Fills a MediaSessionOptions struct with the MediaTracks we want to sent given

View File

@ -124,7 +124,7 @@ class PeerConnectionSignaling : public talk_base::MessageHandler {
// local StreamCollection.
sigslot::signal3<const cricket::SessionDescription*,
const cricket::SessionDescription*,
StreamCollection*> SignalUpdateSessionDescription;
const cricket::Candidates&> SignalUpdateSessionDescription;
private:
// Implement talk_base::MessageHandler.

View File

@ -115,7 +115,7 @@ class MockSignalingObserver : public sigslot::has_slots<> {
void OnUpdateSessionDescription(const cricket::SessionDescription* local,
const cricket::SessionDescription* remote,
StreamCollection* local_streams) {
const cricket::Candidates& candidates) {
update_session_description_counter_++;
}

View File

@ -27,12 +27,15 @@
#include "talk/app/webrtc_dev/webrtcsession.h"
#include "talk/app/webrtc_dev/mediastream.h"
#include "talk/app/webrtc_dev/peerconnection.h"
#include "talk/app/webrtc_dev/peerconnectionsignaling.h"
#include "talk/base/helpers.h"
#include "talk/base/logging.h"
#include "talk/session/phone/channel.h"
#include "talk/session/phone/channelmanager.h"
#include "talk/app/webrtc_dev/mediastream.h"
#include "talk/app/webrtc_dev/peerconnectionsignaling.h"
using cricket::MediaContentDescription;
namespace webrtc {
@ -42,14 +45,22 @@ enum {
// We allow 30 seconds to establish a connection, otherwise it's an error.
static const int kCallSetupTimeout = 30 * 1000;
// Session will accept one candidate per transport channel and dropping other
// candidates generated for that channel. During the session initialization
// one cricket::VoiceChannel and one cricket::VideoChannel will be created with
// rtcp enabled.
static const int kAllowedCandidates = 4;
// TODO(mallinath) - These are magic string used by cricket::VideoChannel.
// These should be moved to a common place.
static const std::string kRtpVideoChannelStr = "video_rtp";
static const std::string kRtcpVideoChannelStr = "video_rtcp";
WebRtcSession::WebRtcSession(cricket::ChannelManager* channel_manager,
talk_base::Thread* signaling_thread,
talk_base::Thread* worker_thread,
cricket::PortAllocator* port_allocator,
PeerConnectionSignaling* pc_signaling)
// TODO(ronghuawu): get the signaling thread from PeerConnectionSignaling
: cricket::BaseSession(worker_thread, worker_thread,
port_allocator,
: cricket::BaseSession(signaling_thread, worker_thread, port_allocator,
talk_base::ToString(talk_base::CreateRandomId()),
cricket::NS_JINGLE_RTP, true) {
// TODO(mallinath) - Remove initiator flag from BaseSession. As it's being
@ -81,11 +92,73 @@ void WebRtcSession::Terminate() {
void WebRtcSession::OnSignalUpdateSessionDescription(
const cricket::SessionDescription* local_desc,
const cricket::SessionDescription* remote_desc,
StreamCollection* streams) {
const cricket::Candidates& remote_candidates) {
// Session updates are not supported yet. If session is in progress state
// ignore this callback.
if (state() == STATE_INPROGRESS) {
// TODO(mallinath) - Handling of session updates is not ready yet.
ProcessSessionUpdate(local_desc, remote_desc);
if (remote_candidates.size() > 0) {
SetRemoteCandidates(remote_candidates);
}
return;
}
// Local session and remote session descriptions are available before
// any state change. So for session it's doesn't matter it's initiator
// or receiver of the call. Session will be treated as initiator.
// Apply first local description
set_local_description(local_desc);
SetState(STATE_SENTINITIATE);
// Applying remote description on the session
set_remote_description(const_cast<cricket::SessionDescription*>(remote_desc));
SetState(STATE_RECEIVEDACCEPT);
// Set remote candidates
SetRemoteCandidates(remote_candidates);
}
void WebRtcSession::SetRemoteCandidates(
const cricket::Candidates& candidates) {
// First partition the candidates for the proxies. During creation of channels
// we created CN_AUDIO (audio) and CN_VIDEO (video) proxies.
cricket::Candidates audio_candidates;
cricket::Candidates video_candidates;
for (cricket::Candidates::const_iterator citer = candidates.begin();
citer != candidates.end(); ++citer) {
if (((*citer).name().compare(kRtpVideoChannelStr) == 0) ||
((*citer).name().compare(kRtcpVideoChannelStr)) == 0) {
// Candidate names for video rtp and rtcp channel
video_candidates.push_back(*citer);
} else {
// Candidates for audio rtp and rtcp channel
// Channel name will be "rtp" and "rtcp"
audio_candidates.push_back(*citer);
}
}
if (!audio_candidates.empty()) {
cricket::TransportProxy* audio_proxy = GetTransportProxy(cricket::CN_AUDIO);
if (audio_proxy) {
// CompleteNegotiation will set actual impl's in Proxy.
audio_proxy->CompleteNegotiation();
// TODO(mallinath) - Add a interface to TransportProxy to accept
// remote candidate list.
audio_proxy->impl()->OnRemoteCandidates(audio_candidates);
} else {
LOG(LS_INFO) << "No audio TransportProxy exists";
}
}
if (!video_candidates.empty()) {
cricket::TransportProxy* video_proxy = GetTransportProxy(cricket::CN_VIDEO);
if (video_proxy) {
// CompleteNegotiation will set actual impl's in Proxy.
video_proxy->CompleteNegotiation();
// TODO(mallinath) - Add a interface to TransportProxy to accept
// remote candidate list.
video_proxy->impl()->OnRemoteCandidates(audio_candidates);
} else {
LOG(LS_INFO) << "No video TransportProxy exists";
}
}
}
bool WebRtcSession::CreateChannels() {
@ -102,6 +175,12 @@ bool WebRtcSession::CreateChannels() {
LOG(LS_ERROR) << "Failed to create video channel";
return false;
}
// TransportProxies and TransportChannels will be created when
// CreateVoiceChannel and CreateVideoChannel are called.
// Try connecting all transport channels. This is necessary to generate
// ICE candidates.
SpeculativelyConnectAllTransportChannels();
return true;
}
@ -132,7 +211,13 @@ void WebRtcSession::OnTransportWritable(cricket::Transport* transport) {
void WebRtcSession::OnTransportCandidatesReady(
cricket::Transport* transport, const cricket::Candidates& candidates) {
ASSERT(signaling_thread()->IsCurrent());
pc_signaling_->Initialize(candidates);
// Drop additional candidates for the same channel;
// local_candidates_ will have one candidate per channel.
if (local_candidates_.size() == kAllowedCandidates)
return;
InsertTransportCandidates(candidates);
if (local_candidates_.size() == kAllowedCandidates)
pc_signaling_->Initialize(candidates);
}
void WebRtcSession::OnTransportChannelGone(cricket::Transport* transport) {
@ -142,10 +227,88 @@ void WebRtcSession::OnTransportChannelGone(cricket::Transport* transport) {
void WebRtcSession::OnMessage(talk_base::Message* msg) {
switch (msg->message_id) {
case MSG_CANDIDATE_TIMEOUT:
LOG(LS_ERROR) << "Transport is not in writable state.";
SignalError();
break;
default:
break;
}
}
void WebRtcSession::InsertTransportCandidates(
const cricket::Candidates& candidates) {
for (cricket::Candidates::const_iterator citer = candidates.begin();
citer != candidates.end(); ++citer) {
// Find candidates by name, if this channel name not exists in local
// candidate list, store it.
if (!CheckCandidate((*citer).name())) {
local_candidates_.push_back(*citer);
}
}
}
// Check transport candidate already available for transport channel as only
// one cricket::Candidate allower per channel.
bool WebRtcSession::CheckCandidate(const std::string& name) {
bool ret = false;
for (cricket::Candidates::iterator iter = local_candidates_.begin();
iter != local_candidates_.end(); ++iter) {
if ((*iter).name().compare(name) == 0) {
ret = true;
break;
}
}
return ret;
}
void WebRtcSession::ProcessSessionUpdate(
const cricket::SessionDescription* local_desc,
const cricket::SessionDescription* remote_desc) {
if (local_desc) {
ProcessLocalMediaChanges(local_desc);
}
if (remote_desc) {
ProcessRemoteMediaChanges(remote_desc);
}
}
bool WebRtcSession::GetAudioSourceParamInfo(
const cricket::SessionDescription* sdesc,
cricket::Sources* sources) {
bool ret = false;
const cricket::ContentInfo* content = GetFirstAudioContent(sdesc);
if (content) {
const MediaContentDescription* audio_desc =
static_cast<const MediaContentDescription*> (content->description);
*sources = audio_desc->sources();
ret = true;
}
return ret;
}
bool WebRtcSession::GetVideoSourceParamInfo(
const cricket::SessionDescription* sdesc,
cricket::Sources* sources) {
bool ret = false;
const cricket::ContentInfo* content = GetFirstVideoContent(sdesc);
if (content) {
const MediaContentDescription* video_desc =
static_cast<const MediaContentDescription*> (content->description);
*sources = video_desc->sources();
ret = true;
}
return ret;
}
void WebRtcSession::ProcessLocalMediaChanges(
const cricket::SessionDescription* sdesc) {
//TODO(mallinath) - Handling of local media stream changes in active session
}
void WebRtcSession::ProcessRemoteMediaChanges(
const cricket::SessionDescription* sdesc) {
//TODO(mallinath) - Handling of remote media stream changes in active session
}
} // namespace webrtc

View File

@ -28,9 +28,13 @@
#ifndef TALK_APP_WEBRTC_WEBRTCSESSION_H_
#define TALK_APP_WEBRTC_WEBRTCSESSION_H_
#include <string>
#include <vector>
#include "talk/base/sigslot.h"
#include "talk/base/thread.h"
#include "talk/p2p/base/session.h"
#include "talk/session/phone/mediasession.h"
namespace cricket {
class ChannelManager;
@ -48,6 +52,7 @@ class StreamCollection;
class WebRtcSession : public cricket::BaseSession {
public:
WebRtcSession(cricket::ChannelManager* channel_manager,
talk_base::Thread* signaling_thread,
talk_base::Thread* worker_thread,
cricket::PortAllocator* port_allocator,
PeerConnectionSignaling* pc_signaling);
@ -62,12 +67,19 @@ class WebRtcSession : public cricket::BaseSession {
return video_channel_.get();
}
// Generic error message callback from WebRtcSession.
// TODO(mallinath) - It may be necessary to supply error code as well.
sigslot::signal0<> SignalError;
void ProcessSessionUpdate(const cricket::SessionDescription* local_desc,
const cricket::SessionDescription* remote_desc);
private:
// Callback handling from PeerConnectionSignaling
void OnSignalUpdateSessionDescription(
const cricket::SessionDescription* local_desc,
const cricket::SessionDescription* remote_desc,
StreamCollection* streams);
const cricket::Candidates& remote_candidates);
// Transport related callbacks, override from cricket::BaseSession.
virtual void OnTransportRequestSignaling(cricket::Transport* transport);
@ -80,14 +92,27 @@ class WebRtcSession : public cricket::BaseSession {
// Creates channels for voice and video.
bool CreateChannels();
virtual void OnMessage(talk_base::Message* msg);
void InsertTransportCandidates(const cricket::Candidates& candidates);
void Terminate();
// Get candidate from the local candidates list by the name.
bool CheckCandidate(const std::string& name);
void SetRemoteCandidates(const cricket::Candidates& candidates);
// Helper methods to get handle to the MediaContentDescription sources param.
bool GetAudioSourceParamInfo(const cricket::SessionDescription* sdesc,
cricket::Sources* sources);
bool GetVideoSourceParamInfo(const cricket::SessionDescription* sdesc,
cricket::Sources* sources);
void ProcessLocalMediaChanges(const cricket::SessionDescription* sdesc);
void ProcessRemoteMediaChanges(const cricket::SessionDescription* sdesc);
private:
PeerConnectionSignaling* pc_signaling_;
talk_base::scoped_ptr<cricket::VoiceChannel> voice_channel_;
talk_base::scoped_ptr<cricket::VideoChannel> video_channel_;
cricket::ChannelManager* channel_manager_;
cricket::Candidates local_candidates_;
};
} // namespace webrtc

View File

@ -32,21 +32,27 @@
#include "talk/session/phone/channelmanager.h"
#include "talk/p2p/client/fakeportallocator.h"
class MockPeerConnectionSignaling {
};
class WebRtcSessionTest : public testing::Test {
public:
WebRtcSessionTest() {
}
~WebRtcSessionTest() {
}
virtual void SetUp() {
signaling_thread_ = talk_base::Thread::Current();
worker_thread_ = talk_base::Thread::Current();
channel_manager_.reset(new cricket::ChannelManager(worker_thread_));
port_allocator_.reset(
new cricket::FakePortAllocator(worker_thread_, NULL));
pc_signaling_.reset(
new webrtc::PeerConnectionSignaling(channel_manager_.get()));
ASSERT_TRUE(channel_manager_.get() != NULL);
ASSERT_TRUE(session_.get() == NULL);
}
~WebRtcSessionTest() {
new webrtc::PeerConnectionSignaling(channel_manager_.get(),
signaling_thread_));
}
bool InitializeSession() {
@ -58,9 +64,11 @@ class WebRtcSessionTest : public testing::Test {
}
void Init() {
ASSERT_TRUE(channel_manager_.get() != NULL);
ASSERT_TRUE(session_.get() == NULL);
EXPECT_TRUE(channel_manager_.get()->Init());
session_.reset(new webrtc::WebRtcSession(
channel_manager_.get(), worker_thread_,
channel_manager_.get(), worker_thread_, signaling_thread_,
port_allocator_.get(), pc_signaling_.get()));
EXPECT_TRUE(InitializeSession());
EXPECT_TRUE(CheckChannels());

View File

@ -0,0 +1,92 @@
// Copyright 2010 Google Inc. All Rights Reserved,
//
// Author: Justin Uberti (juberti@google.com)
#ifndef TALK_P2P_CLIENT_FAKEPORTALLOCATOR_H_
#define TALK_P2P_CLIENT_FAKEPORTALLOCATOR_H_
#include <string>
#include "talk/base/basicpacketsocketfactory.h"
#include "talk/base/scoped_ptr.h"
#include "talk/p2p/base/portallocator.h"
#include "talk/p2p/base/udpport.h"
namespace talk_base {
class SocketFactory;
class Thread;
}
namespace cricket {
class FakePortAllocatorSession : public PortAllocatorSession {
public:
FakePortAllocatorSession(talk_base::Thread* worker_thread,
talk_base::PacketSocketFactory* factory,
const std::string& name,
const std::string& session_type)
: PortAllocatorSession(0), worker_thread_(worker_thread),
factory_(factory), name_(name),
network_("network", "unittest", 0x7F000001, 0),
port_(NULL), running_(false) {
}
virtual void GetInitialPorts() {
if (!port_.get()) {
port_.reset(cricket::UDPPort::Create(worker_thread_, factory_,
&network_, network_.ip(), 0, 0));
AddPort(port_.get());
}
}
virtual void StartGetAllPorts() { running_ = true; }
virtual void StopGetAllPorts() { running_ = false; }
virtual bool IsGettingAllPorts() { return running_; }
void AddPort(cricket::Port* port) {
port->set_name(name_);
port->set_preference(1.0);
port->set_generation(0);
port->SignalAddressReady.connect(
this, &FakePortAllocatorSession::OnAddressReady);
port->PrepareAddress();
SignalPortReady(this, port);
}
void OnAddressReady(cricket::Port* port) {
SignalCandidatesReady(this, port->candidates());
}
private:
talk_base::Thread* worker_thread_;
talk_base::PacketSocketFactory* factory_;
std::string name_;
talk_base::Network network_;
talk_base::scoped_ptr<cricket::Port> port_;
bool running_;
};
class FakePortAllocator : public cricket::PortAllocator {
public:
FakePortAllocator(talk_base::Thread* worker_thread,
talk_base::PacketSocketFactory* factory)
: worker_thread_(worker_thread), factory_(factory) {
if (factory_ == NULL) {
owned_factory_.reset(new talk_base::BasicPacketSocketFactory(
worker_thread_));
factory_ = owned_factory_.get();
}
}
virtual cricket::PortAllocatorSession* CreateSession(
const std::string &name, const std::string &session_type) {
return new FakePortAllocatorSession(worker_thread_, factory_, name,
session_type);
}
private:
talk_base::Thread* worker_thread_;
talk_base::PacketSocketFactory* factory_;
talk_base::scoped_ptr<talk_base::BasicPacketSocketFactory> owned_factory_;
};
} // namespace cricket
#endif // TALK_P2P_CLIENT_FAKEPORTALLOCATOR_H_