Refactor how the TransportChannels are set in the BaseChannel to rely lesson Session, so that in the future we can rely on Transport instead, and also be able to change Transports on the fly for BUNDLE.

Also, remove channel_name.  It's no longer needed.

This is a part of the big BUNDLE implementation at https://webrtc-codereview.appspot.com/45519004/

R=decurtis@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/43719004

Cr-Commit-Position: refs/heads/master@{#8741}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8741 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
pthatcher@webrtc.org 2015-03-16 20:19:12 +00:00
parent 4eeef584a7
commit 6ad507ac35
8 changed files with 114 additions and 87 deletions

View File

@ -187,39 +187,28 @@ BaseChannel::~BaseChannel() {
// the media channel may try to send on the dead transport channel. NULLing
// is not an effective strategy since the sends will come on another thread.
delete media_channel_;
set_rtcp_transport_channel(NULL);
if (transport_channel_ != NULL)
session_->DestroyChannel(content_name_, transport_channel_->component());
set_transport_channel(nullptr);
set_rtcp_transport_channel(nullptr);
LOG(LS_INFO) << "Destroyed channel";
}
bool BaseChannel::Init(TransportChannel* transport_channel,
TransportChannel* rtcp_transport_channel) {
if (transport_channel == NULL) {
return false;
}
if (rtcp() && rtcp_transport_channel == NULL) {
return false;
}
transport_channel_ = transport_channel;
if (!SetDtlsSrtpCiphers(transport_channel_, false)) {
bool BaseChannel::Init() {
if (!SetTransportChannels(session(), rtcp())) {
return false;
}
transport_channel_->SignalWritableState.connect(
this, &BaseChannel::OnWritableState);
transport_channel_->SignalReadPacket.connect(
this, &BaseChannel::OnChannelRead);
transport_channel_->SignalReadyToSend.connect(
this, &BaseChannel::OnReadyToSend);
if (!SetDtlsSrtpCiphers(transport_channel(), false)) {
return false;
}
if (rtcp() && !SetDtlsSrtpCiphers(rtcp_transport_channel(), true)) {
return false;
}
session_->SignalNewLocalDescription.connect(
this, &BaseChannel::OnNewLocalDescription);
session_->SignalNewRemoteDescription.connect(
this, &BaseChannel::OnNewRemoteDescription);
set_rtcp_transport_channel(rtcp_transport_channel);
// Both RTP and RTCP channels are set, we can call SetInterface on
// media channel and it can set network options.
media_channel_->SetInterface(this);
@ -230,6 +219,90 @@ void BaseChannel::Deinit() {
media_channel_->SetInterface(NULL);
}
bool BaseChannel::SetTransportChannels(BaseSession* session, bool rtcp) {
return worker_thread_->Invoke<bool>(Bind(
&BaseChannel::SetTransportChannels_w, this, session, rtcp));
}
bool BaseChannel::SetTransportChannels_w(BaseSession* session, bool rtcp) {
ASSERT(worker_thread_ == rtc::Thread::Current());
set_transport_channel(session->CreateChannel(
content_name(), cricket::ICE_CANDIDATE_COMPONENT_RTP));
if (!transport_channel()) {
return false;
}
if (rtcp) {
set_rtcp_transport_channel(session->CreateChannel(
content_name(), cricket::ICE_CANDIDATE_COMPONENT_RTCP));
if (!rtcp_transport_channel()) {
return false;
}
} else {
set_rtcp_transport_channel(nullptr);
}
return true;
}
void BaseChannel::set_transport_channel(TransportChannel* new_tc) {
ASSERT(worker_thread_ == rtc::Thread::Current());
TransportChannel* old_tc = transport_channel_;
if (old_tc == new_tc) {
return;
}
if (old_tc) {
DisconnectFromTransportChannel(old_tc);
session()->DestroyChannel(
content_name(), cricket::ICE_CANDIDATE_COMPONENT_RTP);
}
transport_channel_ = new_tc;
if (new_tc) {
ConnectToTransportChannel(new_tc);
}
}
void BaseChannel::set_rtcp_transport_channel(TransportChannel* new_tc) {
ASSERT(worker_thread_ == rtc::Thread::Current());
TransportChannel* old_tc = rtcp_transport_channel_;
if (old_tc == new_tc) {
return;
}
if (old_tc) {
DisconnectFromTransportChannel(old_tc);
session()->DestroyChannel(
content_name(), cricket::ICE_CANDIDATE_COMPONENT_RTCP);
}
rtcp_transport_channel_ = new_tc;
if (new_tc) {
ConnectToTransportChannel(new_tc);
}
}
void BaseChannel::ConnectToTransportChannel(TransportChannel* tc) {
ASSERT(worker_thread_ == rtc::Thread::Current());
tc->SignalWritableState.connect(this, &BaseChannel::OnWritableState);
tc->SignalReadPacket.connect(this, &BaseChannel::OnChannelRead);
tc->SignalReadyToSend.connect(this, &BaseChannel::OnReadyToSend);
}
void BaseChannel::DisconnectFromTransportChannel(TransportChannel* tc) {
ASSERT(worker_thread_ == rtc::Thread::Current());
tc->SignalWritableState.disconnect(this);
tc->SignalReadPacket.disconnect(this);
tc->SignalReadyToSend.disconnect(this);
}
bool BaseChannel::Enable(bool enable) {
worker_thread_->Invoke<void>(Bind(
enable ? &BaseChannel::EnableMedia_w : &BaseChannel::DisableMedia_w,
@ -300,26 +373,6 @@ bool BaseChannel::GetConnectionStats(ConnectionInfos* infos) {
return transport_channel_->GetStats(infos);
}
void BaseChannel::set_rtcp_transport_channel(TransportChannel* channel) {
if (rtcp_transport_channel_ != channel) {
if (rtcp_transport_channel_) {
session_->DestroyChannel(
content_name_, rtcp_transport_channel_->component());
}
rtcp_transport_channel_ = channel;
if (rtcp_transport_channel_) {
// TODO(juberti): Propagate this error code
VERIFY(SetDtlsSrtpCiphers(rtcp_transport_channel_, true));
rtcp_transport_channel_->SignalWritableState.connect(
this, &BaseChannel::OnWritableState);
rtcp_transport_channel_->SignalReadPacket.connect(
this, &BaseChannel::OnChannelRead);
rtcp_transport_channel_->SignalReadyToSend.connect(
this, &BaseChannel::OnReadyToSend);
}
}
}
bool BaseChannel::IsReadyToReceive() const {
// Receive data if we are enabled and have local content,
return enabled() && IsReceiveContentDirection(local_content_direction_);
@ -1271,11 +1324,7 @@ VoiceChannel::~VoiceChannel() {
}
bool VoiceChannel::Init() {
TransportChannel* rtcp_channel = rtcp() ? session()->CreateChannel(
content_name(), "rtcp", ICE_CANDIDATE_COMPONENT_RTCP) : NULL;
if (!BaseChannel::Init(session()->CreateChannel(
content_name(), "rtp", ICE_CANDIDATE_COMPONENT_RTP),
rtcp_channel)) {
if (!BaseChannel::Init()) {
return false;
}
media_channel()->SignalMediaError.connect(
@ -1665,11 +1714,7 @@ VideoChannel::VideoChannel(rtc::Thread* thread,
}
bool VideoChannel::Init() {
TransportChannel* rtcp_channel = rtcp() ? session()->CreateChannel(
content_name(), "video_rtcp", ICE_CANDIDATE_COMPONENT_RTCP) : NULL;
if (!BaseChannel::Init(session()->CreateChannel(
content_name(), "video_rtp", ICE_CANDIDATE_COMPONENT_RTP),
rtcp_channel)) {
if (!BaseChannel::Init()) {
return false;
}
media_channel()->SignalMediaError.connect(
@ -2118,11 +2163,7 @@ DataChannel::~DataChannel() {
}
bool DataChannel::Init() {
TransportChannel* rtcp_channel = rtcp() ? session()->CreateChannel(
content_name(), "data_rtcp", ICE_CANDIDATE_COMPONENT_RTCP) : NULL;
if (!BaseChannel::Init(session()->CreateChannel(
content_name(), "data_rtp", ICE_CANDIDATE_COMPONENT_RTP),
rtcp_channel)) {
if (!BaseChannel::Init()) {
return false;
}
media_channel()->SignalDataReceived.connect(

View File

@ -80,8 +80,7 @@ class BaseChannel
MediaChannel* channel, BaseSession* session,
const std::string& content_name, bool rtcp);
virtual ~BaseChannel();
bool Init(TransportChannel* transport_channel,
TransportChannel* rtcp_transport_channel);
bool Init();
// Deinit may be called multiple times and is simply ignored if it's alreay
// done.
void Deinit();
@ -232,6 +231,13 @@ class BaseChannel
protected:
MediaEngineInterface* media_engine() const { return media_engine_; }
virtual MediaChannel* media_channel() const { return media_channel_; }
// Sets the transport_channel_ and rtcp_transport_channel_. If
// |rtcp| is false, set rtcp_transport_channel_ is set to NULL. Get
// the transport channels from |session|.
// TODO(pthatcher): Pass in a Transport instead of a BaseSession.
bool SetTransportChannels(BaseSession* session, bool rtcp);
bool SetTransportChannels_w(BaseSession* session, bool rtcp);
void set_transport_channel(TransportChannel* transport);
void set_rtcp_transport_channel(TransportChannel* transport);
bool was_ever_writable() const { return was_ever_writable_; }
void set_local_content_direction(MediaContentDirection direction) {
@ -246,6 +252,9 @@ class BaseChannel
SrtpFilter* srtp_filter() { return &srtp_filter_; }
bool rtcp() const { return rtcp_; }
void ConnectToTransportChannel(TransportChannel* tc);
void DisconnectFromTransportChannel(TransportChannel* tc);
void FlushRtcpMessages();
// NetworkInterface implementation, called by MediaEngine

View File

@ -172,7 +172,7 @@ TEST_F(ChannelManagerTest, NoTransportChannelTest) {
// The test is useless unless the session does not fail creating
// cricket::TransportChannel.
ASSERT_TRUE(session_->CreateChannel(
"audio", "rtp", cricket::ICE_CANDIDATE_COMPONENT_RTP) == NULL);
"audio", cricket::ICE_CANDIDATE_COMPONENT_RTP) == NULL);
cricket::VoiceChannel* voice_channel = cm_->CreateVoiceChannel(
session_, cricket::CN_AUDIO, false);

View File

@ -465,12 +465,11 @@ class FakeSession : public BaseSession {
virtual TransportChannel* CreateChannel(
const std::string& content_name,
const std::string& channel_name,
int component) {
if (fail_create_channel_) {
return NULL;
}
return BaseSession::CreateChannel(content_name, channel_name, component);
return BaseSession::CreateChannel(content_name, component);
}
void set_fail_channel_creation(bool fail_channel_creation) {

View File

@ -46,15 +46,14 @@ TransportChannel* TransportProxy::GetChannel(int component) {
return GetChannelProxy(component);
}
TransportChannel* TransportProxy::CreateChannel(const std::string& name,
int component) {
TransportChannel* TransportProxy::CreateChannel(int component) {
ASSERT(rtc::Thread::Current() == worker_thread_);
ASSERT(GetChannel(component) == NULL);
ASSERT(!transport_->get()->HasChannel(component));
// We always create a proxy in case we need to change out the transport later.
TransportChannelProxy* channel_proxy =
new TransportChannelProxy(content_name(), name, component);
new TransportChannelProxy(content_name(), component);
channels_[component] = channel_proxy;
// If we're already negotiated, create an impl and hook it up to the proxy
@ -142,18 +141,6 @@ TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
return (iter != channels_.end()) ? iter->second : NULL;
}
TransportChannelProxy* TransportProxy::GetChannelProxyByName(
const std::string& name) const {
for (ChannelMap::const_iterator iter = channels_.begin();
iter != channels_.end();
++iter) {
if (iter->second->name() == name) {
return iter->second;
}
}
return NULL;
}
void TransportProxy::CreateChannelImpl(int component) {
worker_thread_->Invoke<void>(Bind(
&TransportProxy::CreateChannelImpl_w, this, component));
@ -474,13 +461,12 @@ bool BaseSession::PushdownRemoteTransportDescription(
}
TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
const std::string& channel_name,
int component) {
// We create the proxy "on demand" here because we need to support
// creating channels at any time, even before we send or receive
// initiate messages, which is before we create the transports.
TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
return transproxy->CreateChannel(channel_name, component);
return transproxy->CreateChannel(component);
}
TransportChannel* BaseSession::GetChannel(const std::string& content_name,

View File

@ -81,8 +81,7 @@ class TransportProxy : public sigslot::has_slots<> {
}
TransportChannel* GetChannel(int component);
TransportChannel* CreateChannel(const std::string& channel_name,
int component);
TransportChannel* CreateChannel(int component);
bool HasChannel(int component);
void DestroyChannel(int component);
@ -130,7 +129,6 @@ class TransportProxy : public sigslot::has_slots<> {
private:
TransportChannelProxy* GetChannelProxy(int component) const;
TransportChannelProxy* GetChannelProxyByName(const std::string& name) const;
// Creates a new channel on the Transport which causes the reference
// count to increment.
@ -306,7 +304,6 @@ class BaseSession : public sigslot::has_slots<>,
// shouldn't be an issue since the main thread will be blocked in
// Send when doing so.
virtual TransportChannel* CreateChannel(const std::string& content_name,
const std::string& channel_name,
int component);
// Returns the channel with the given names.

View File

@ -22,10 +22,8 @@ enum {
};
TransportChannelProxy::TransportChannelProxy(const std::string& content_name,
const std::string& name,
int component)
: TransportChannel(content_name, component),
name_(name),
impl_(NULL) {
worker_thread_ = rtc::Thread::Current();
}

View File

@ -34,11 +34,9 @@ class TransportChannelProxy : public TransportChannel,
public rtc::MessageHandler {
public:
TransportChannelProxy(const std::string& content_name,
const std::string& name,
int component);
virtual ~TransportChannelProxy();
const std::string& name() const { return name_; }
TransportChannelImpl* impl() { return impl_; }
virtual TransportChannelState GetState() const;
@ -85,7 +83,6 @@ class TransportChannelProxy : public TransportChannel,
typedef std::pair<rtc::Socket::Option, int> OptionPair;
typedef std::vector<OptionPair> OptionList;
std::string name_;
rtc::Thread* worker_thread_;
TransportChannelImpl* impl_;
OptionList options_;