diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc index 3a6046e81..6f3048718 100644 --- a/talk/app/webrtc/datachannel.cc +++ b/talk/app/webrtc/datachannel.cc @@ -38,6 +38,10 @@ namespace webrtc { static size_t kMaxQueuedReceivedDataPackets = 100; static size_t kMaxQueuedSendDataPackets = 100; +enum { + MSG_CHANNELREADY, +}; + talk_base::scoped_refptr DataChannel::Create( DataChannelProviderInterface* provider, cricket::DataChannelType dct, @@ -95,6 +99,15 @@ bool DataChannel::Init(const DataChannelInit* config) { // Try to connect to the transport in case the transport channel already // exists. OnTransportChannelCreated(); + + // Checks if the transport is ready to send because the initial channel + // ready signal may have been sent before the DataChannel creation. + // This has to be done async because the upper layer objects (e.g. + // Chrome glue and WebKit) are not wired up properly until after this + // function returns. + if (provider_->ReadyToSendData()) { + talk_base::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL); + } } return true; @@ -217,6 +230,14 @@ void DataChannel::SetSendSsrc(uint32 send_ssrc) { UpdateState(); } +void DataChannel::OnMessage(talk_base::Message* msg) { + switch (msg->message_id) { + case MSG_CHANNELREADY: + OnChannelReady(true); + break; + } +} + // The underlaying data engine is closing. // This function makes sure the DataChannel is disconnected and changes state to // kClosed. diff --git a/talk/app/webrtc/datachannel.h b/talk/app/webrtc/datachannel.h index 0d67293db..5635e63e6 100644 --- a/talk/app/webrtc/datachannel.h +++ b/talk/app/webrtc/datachannel.h @@ -33,6 +33,7 @@ #include "talk/app/webrtc/datachannelinterface.h" #include "talk/app/webrtc/proxy.h" +#include "talk/base/messagehandler.h" #include "talk/base/scoped_ref_ptr.h" #include "talk/base/sigslot.h" #include "talk/media/base/mediachannel.h" @@ -60,6 +61,8 @@ class DataChannelProviderInterface { virtual void RemoveRtpDataStream(uint32 send_ssrc, uint32 recv_ssrc) = 0; // Removes the data channel SID from the transport for SCTP. virtual void RemoveSctpDataStream(uint32 sid) = 0; + // Returns true if the transport channel is ready to send data. + virtual bool ReadyToSendData() const = 0; protected: virtual ~DataChannelProviderInterface() {} @@ -81,7 +84,8 @@ class DataChannelProviderInterface { // kClosed: Both UpdateReceiveSsrc and UpdateSendSsrc has been called with // SSRC==0. class DataChannel : public DataChannelInterface, - public sigslot::has_slots<> { + public sigslot::has_slots<>, + public talk_base::MessageHandler { public: static talk_base::scoped_refptr Create( DataChannelProviderInterface* provider, @@ -109,6 +113,9 @@ class DataChannel : public DataChannelInterface, virtual DataState state() const { return state_; } virtual bool Send(const DataBuffer& buffer); + // talk_base::MessageHandler override. + virtual void OnMessage(talk_base::Message* msg); + // Called if the underlying data engine is closing. void OnDataEngineClose(); diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc index 5d298110c..dba24a2db 100644 --- a/talk/app/webrtc/datachannel_unittest.cc +++ b/talk/app/webrtc/datachannel_unittest.cc @@ -39,11 +39,12 @@ class SctpDataChannelTest : public testing::Test { } void SetChannelReady() { + provider_.set_transport_available(true); webrtc_data_channel_->OnTransportChannelCreated(); if (webrtc_data_channel_->id() < 0) { webrtc_data_channel_->SetSctpSid(0); } - webrtc_data_channel_->OnChannelReady(true); + provider_.set_ready_to_send(true); } webrtc::DataChannelInit init_; @@ -53,27 +54,28 @@ class SctpDataChannelTest : public testing::Test { // Verifies that the data channel is connected to the transport after creation. TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) { - EXPECT_TRUE(provider_.IsConnected(webrtc_data_channel_.get())); - // The sid is not set yet, so it should not have added the streams. - EXPECT_FALSE(provider_.IsSendStreamAdded(webrtc_data_channel_->id())); - EXPECT_FALSE(provider_.IsRecvStreamAdded(webrtc_data_channel_->id())); + provider_.set_transport_available(true); + talk_base::scoped_refptr dc = DataChannel::Create( + &provider_, cricket::DCT_SCTP, "test1", &init_); - webrtc_data_channel_->SetSctpSid(0); - EXPECT_TRUE(provider_.IsSendStreamAdded(webrtc_data_channel_->id())); - EXPECT_TRUE(provider_.IsRecvStreamAdded(webrtc_data_channel_->id())); + EXPECT_TRUE(provider_.IsConnected(dc.get())); + // The sid is not set yet, so it should not have added the streams. + EXPECT_FALSE(provider_.IsSendStreamAdded(dc->id())); + EXPECT_FALSE(provider_.IsRecvStreamAdded(dc->id())); + + dc->SetSctpSid(0); + EXPECT_TRUE(provider_.IsSendStreamAdded(dc->id())); + EXPECT_TRUE(provider_.IsRecvStreamAdded(dc->id())); } // Verifies that the data channel is connected to the transport if the transport // is not available initially and becomes available later. TEST_F(SctpDataChannelTest, ConnectedAfterTransportBecomesAvailable) { - provider_.set_transport_available(false); - talk_base::scoped_refptr dc = DataChannel::Create( - &provider_, cricket::DCT_SCTP, "test1", &init_); - EXPECT_FALSE(provider_.IsConnected(dc.get())); + EXPECT_FALSE(provider_.IsConnected(webrtc_data_channel_.get())); provider_.set_transport_available(true); - dc->OnTransportChannelCreated(); - EXPECT_TRUE(provider_.IsConnected(dc.get())); + webrtc_data_channel_->OnTransportChannelCreated(); + EXPECT_TRUE(provider_.IsConnected(webrtc_data_channel_.get())); } // Tests the state of the data channel. @@ -81,6 +83,7 @@ TEST_F(SctpDataChannelTest, StateTransition) { EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, webrtc_data_channel_->state()); SetChannelReady(); + EXPECT_EQ(webrtc::DataChannelInterface::kOpen, webrtc_data_channel_->state()); webrtc_data_channel_->Close(); EXPECT_EQ(webrtc::DataChannelInterface::kClosed, @@ -132,3 +135,16 @@ TEST_F(SctpDataChannelTest, OpenMessageSent) { EXPECT_EQ(provider_.last_send_data_params().ssrc, static_cast(webrtc_data_channel_->id())); } + +// Tests that the DataChannel created after transport gets ready can enter OPEN +// state. +TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) { + SetChannelReady(); + webrtc::DataChannelInit init; + init.id = 1; + talk_base::scoped_refptr dc = + DataChannel::Create(&provider_, cricket::DCT_SCTP, "test1", &init); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, dc->state()); + EXPECT_TRUE_WAIT(webrtc::DataChannelInterface::kOpen == dc->state(), + 1000); +} diff --git a/talk/app/webrtc/mediastreamsignaling.cc b/talk/app/webrtc/mediastreamsignaling.cc index 91dde24e0..8d2542569 100644 --- a/talk/app/webrtc/mediastreamsignaling.cc +++ b/talk/app/webrtc/mediastreamsignaling.cc @@ -276,8 +276,6 @@ bool MediaStreamSignaling::AddDataChannelFromOpenMessage( } data_channels_[label] = channel; stream_observer_->OnAddDataChannel(channel); - // It's immediately ready to use. - channel->OnChannelReady(true); return true; } diff --git a/talk/app/webrtc/test/fakedatachannelprovider.h b/talk/app/webrtc/test/fakedatachannelprovider.h index 332641930..429f4dff1 100644 --- a/talk/app/webrtc/test/fakedatachannelprovider.h +++ b/talk/app/webrtc/test/fakedatachannelprovider.h @@ -30,14 +30,15 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { public: FakeDataChannelProvider() - : id_allocation_should_fail_(false), - send_blocked_(false), - transport_available_(true) {} + : send_blocked_(false), + transport_available_(false), + ready_to_send_(false) {} virtual ~FakeDataChannelProvider() {} virtual bool SendData(const cricket::SendDataParams& params, const talk_base::Buffer& payload, cricket::SendDataResult* result) OVERRIDE { + ASSERT(ready_to_send_ && transport_available_); if (send_blocked_) { *result = cricket::SDR_BLOCK; return false; @@ -45,6 +46,7 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { last_send_data_params_ = params; return true; } + virtual bool ConnectDataChannel(webrtc::DataChannel* data_channel) OVERRIDE { ASSERT(connected_channels_.find(data_channel) == connected_channels_.end()); if (!transport_available_) { @@ -54,55 +56,98 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { connected_channels_.insert(data_channel); return true; } + virtual void DisconnectDataChannel( webrtc::DataChannel* data_channel) OVERRIDE { ASSERT(connected_channels_.find(data_channel) != connected_channels_.end()); LOG(LS_INFO) << "DataChannel disconnected " << data_channel; connected_channels_.erase(data_channel); } + virtual void AddRtpDataStream(uint32 send_ssrc, uint32 recv_ssrc) OVERRIDE { + if (!transport_available_) { + return; + } send_ssrcs_.insert(send_ssrc); recv_ssrcs_.insert(recv_ssrc); } + virtual void AddSctpDataStream(uint32 sid) OVERRIDE { + if (!transport_available_) { + return; + } AddRtpDataStream(sid, sid); } + virtual void RemoveRtpDataStream( uint32 send_ssrc, uint32 recv_ssrc) OVERRIDE { send_ssrcs_.erase(send_ssrc); recv_ssrcs_.erase(recv_ssrc); } + virtual void RemoveSctpDataStream(uint32 sid) OVERRIDE { RemoveRtpDataStream(sid, sid); } + virtual bool ReadyToSendData() const OVERRIDE { + return ready_to_send_; + } + + // Set true to emulate the SCTP stream being blocked by congestion control. void set_send_blocked(bool blocked) { send_blocked_ = blocked; + if (!blocked) { + std::set::iterator it; + for (it = connected_channels_.begin(); + it != connected_channels_.end(); + ++it) { + (*it)->OnChannelReady(true); + } + } } - cricket::SendDataParams last_send_data_params() const { - return last_send_data_params_; - } - void set_id_allocaiton_should_fail(bool fail) { - id_allocation_should_fail_ = fail; - } + + // Set true to emulate the transport channel creation, e.g. after + // setLocalDescription/setRemoteDescription called with data content. void set_transport_available(bool available) { transport_available_ = available; } + + // Set true to emulate the transport ReadyToSendData signal when the transport + // becomes writable for the first time. + void set_ready_to_send(bool ready) { + ASSERT(transport_available_); + ready_to_send_ = ready; + if (ready) { + std::set::iterator it; + for (it = connected_channels_.begin(); + it != connected_channels_.end(); + ++it) { + (*it)->OnChannelReady(true); + } + } + } + + cricket::SendDataParams last_send_data_params() const { + return last_send_data_params_; + } + bool IsConnected(webrtc::DataChannel* data_channel) const { return connected_channels_.find(data_channel) != connected_channels_.end(); } + bool IsSendStreamAdded(uint32 stream) const { return send_ssrcs_.find(stream) != send_ssrcs_.end(); } + bool IsRecvStreamAdded(uint32 stream) const { return recv_ssrcs_.find(stream) != recv_ssrcs_.end(); } private: cricket::SendDataParams last_send_data_params_; - bool id_allocation_should_fail_; bool send_blocked_; bool transport_available_; + bool ready_to_send_; std::set connected_channels_; std::set send_ssrcs_; std::set recv_ssrcs_; diff --git a/talk/app/webrtc/webrtcsession.cc b/talk/app/webrtc/webrtcsession.cc index a96c2da8c..5935ea046 100644 --- a/talk/app/webrtc/webrtcsession.cc +++ b/talk/app/webrtc/webrtcsession.cc @@ -1024,6 +1024,10 @@ void WebRtcSession::RemoveSctpDataStream(uint32 sid) { RemoveRtpDataStream(sid, sid); } +bool WebRtcSession::ReadyToSendData() const { + return data_channel_.get() && data_channel_->ready_to_send_data(); +} + talk_base::scoped_refptr WebRtcSession::CreateDataChannel( const std::string& label, const DataChannelInit* config) { diff --git a/talk/app/webrtc/webrtcsession.h b/talk/app/webrtc/webrtcsession.h index 893ab3fb4..4a39a11d1 100644 --- a/talk/app/webrtc/webrtcsession.h +++ b/talk/app/webrtc/webrtcsession.h @@ -194,6 +194,7 @@ class WebRtcSession : public cricket::BaseSession, virtual void AddSctpDataStream(uint32 sid) OVERRIDE; virtual void RemoveRtpDataStream(uint32 send_ssrc, uint32 recv_ssrc) OVERRIDE; virtual void RemoveSctpDataStream(uint32 sid) OVERRIDE; + virtual bool ReadyToSendData() const OVERRIDE; talk_base::scoped_refptr CreateDataChannel( const std::string& label, diff --git a/talk/base/common.h b/talk/base/common.h index 6350baf1e..d557639e2 100644 --- a/talk/base/common.h +++ b/talk/base/common.h @@ -198,7 +198,7 @@ inline bool ImplicitCastToBool(bool result) { return result; } // TODO(ajm): Hack to avoid multiple definitions until the base/ of webrtc and // libjingle are merged. #if !defined(WARN_UNUSED_RESULT) -#if defined(COMPILER_GCC) +#if defined(__GNUC__) #define WARN_UNUSED_RESULT __attribute__((warn_unused_result)) #else #define WARN_UNUSED_RESULT diff --git a/talk/session/media/channel.cc b/talk/session/media/channel.cc index 1a3063d44..e0aff8999 100644 --- a/talk/session/media/channel.cc +++ b/talk/session/media/channel.cc @@ -2477,7 +2477,8 @@ DataChannel::DataChannel(talk_base::Thread* thread, bool rtcp) // MediaEngine is NULL : BaseChannel(thread, NULL, media_channel, session, content_name, rtcp), - data_channel_type_(cricket::DCT_NONE) { + data_channel_type_(cricket::DCT_NONE), + ready_to_send_data_(false) { } DataChannel::~DataChannel() { @@ -2701,7 +2702,8 @@ void DataChannel::OnMessage(talk_base::Message *pmsg) { case MSG_READYTOSENDDATA: { DataChannelReadyToSendMessageData* data = static_cast(pmsg->pdata); - SignalReadyToSendData(data->data()); + ready_to_send_data_ = data->data(); + SignalReadyToSendData(ready_to_send_data_); delete data; break; } diff --git a/talk/session/media/channel.h b/talk/session/media/channel.h index 51fb575dd..27a81a6d5 100644 --- a/talk/session/media/channel.h +++ b/talk/session/media/channel.h @@ -610,6 +610,11 @@ class DataChannel : public BaseChannel { void StartMediaMonitor(int cms); void StopMediaMonitor(); + // Should be called on the signaling thread only. + bool ready_to_send_data() const { + return ready_to_send_data_; + } + sigslot::signal2 SignalMediaMonitor; sigslot::signal2&> SignalConnectionMonitor; @@ -714,6 +719,7 @@ class DataChannel : public BaseChannel { // TODO(pthatcher): Make a separate SctpDataChannel and // RtpDataChannel instead of using this. DataChannelType data_channel_type_; + bool ready_to_send_data_; }; } // namespace cricket