diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc index 14caa416b..af4fb244f 100644 --- a/talk/app/webrtc/datachannel.cc +++ b/talk/app/webrtc/datachannel.cc @@ -35,13 +35,57 @@ namespace webrtc { -static size_t kMaxQueuedReceivedDataPackets = 100; -static size_t kMaxQueuedSendDataPackets = 100; +static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024; +static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024; enum { MSG_CHANNELREADY, }; +DataChannel::PacketQueue::PacketQueue() : byte_count_(0) {} + +DataChannel::PacketQueue::~PacketQueue() { + Clear(); +} + +bool DataChannel::PacketQueue::Empty() const { + return packets_.empty(); +} + +DataBuffer* DataChannel::PacketQueue::Front() { + return packets_.front(); +} + +void DataChannel::PacketQueue::Pop() { + if (packets_.empty()) { + return; + } + + byte_count_ -= packets_.front()->size(); + packets_.pop_front(); +} + +void DataChannel::PacketQueue::Push(DataBuffer* packet) { + byte_count_ += packet->size(); + packets_.push_back(packet); +} + +void DataChannel::PacketQueue::Clear() { + while (!packets_.empty()) { + delete packets_.front(); + packets_.pop_front(); + } + byte_count_ = 0; +} + +void DataChannel::PacketQueue::Swap(PacketQueue* other) { + size_t other_byte_count = other->byte_count_; + other->byte_count_ = byte_count_; + byte_count_ = other_byte_count; + + other->packets_.swap(packets_); +} + talk_base::scoped_refptr DataChannel::Create( DataChannelProviderInterface* provider, cricket::DataChannelType dct, @@ -114,11 +158,7 @@ bool DataChannel::Init(const InternalDataChannelInit& config) { return true; } -DataChannel::~DataChannel() { - ClearQueuedReceivedData(); - ClearQueuedSendData(); - ClearQueuedControlData(); -} +DataChannel::~DataChannel() {} void DataChannel::RegisterObserver(DataChannelObserver* observer) { observer_ = observer; @@ -139,13 +179,7 @@ bool DataChannel::reliable() const { } uint64 DataChannel::buffered_amount() const { - uint64 buffered_amount = 0; - for (std::deque::const_iterator it = queued_send_data_.begin(); - it != queued_send_data_.end(); - ++it) { - buffered_amount += (*it)->size(); - } - return buffered_amount; + return queued_send_data_.byte_count(); } void DataChannel::Close() { @@ -163,89 +197,25 @@ bool DataChannel::Send(const DataBuffer& buffer) { } // If the queue is non-empty, we're waiting for SignalReadyToSend, // so just add to the end of the queue and keep waiting. - if (!queued_send_data_.empty()) { - if (!QueueSendData(buffer)) { - if (data_channel_type_ == cricket::DCT_RTP) { - return false; - } + if (!queued_send_data_.Empty()) { + // Only SCTP DataChannel queues the outgoing data when the transport is + // blocked. + ASSERT(data_channel_type_ == cricket::DCT_SCTP); + if (!QueueSendDataMessage(buffer)) { Close(); } return true; } - cricket::SendDataResult send_result; - if (!InternalSendWithoutQueueing(buffer, &send_result)) { - if (data_channel_type_ == cricket::DCT_RTP) { - return false; - } - if (send_result != cricket::SDR_BLOCK || !QueueSendData(buffer)) { - Close(); - } + bool success = SendDataMessage(buffer); + if (data_channel_type_ == cricket::DCT_RTP) { + return success; } + + // Always return true for SCTP DataChannel per the spec. return true; } -void DataChannel::QueueControl(const talk_base::Buffer* buffer) { - queued_control_data_.push(buffer); -} - -bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) { - ASSERT(data_channel_type_ == cricket::DCT_SCTP && - was_ever_writable_ && - config_.id >= 0 && - !config_.negotiated); - - talk_base::scoped_ptr buffer(raw_buffer); - - cricket::SendDataParams send_params; - send_params.ssrc = config_.id; - send_params.ordered = true; - send_params.type = cricket::DMT_CONTROL; - - cricket::SendDataResult send_result; - bool retval = provider_->SendData(send_params, *buffer, &send_result); - if (retval) { - LOG(LS_INFO) << "Sent OPEN message on channel " << config_.id; - // Send data as ordered before we receive any mesage from the remote peer - // to make sure the remote peer will not receive any data before it receives - // the OPEN message. - waiting_for_open_ack_ = true; - } else if (send_result == cricket::SDR_BLOCK) { - // Link is congested. Queue for later. - QueueControl(buffer.release()); - } else { - LOG(LS_ERROR) << "Failed to send OPEN message with result " - << send_result << " on channel " << config_.id; - } - return retval; -} - -bool DataChannel::SendOpenAckMessage(const talk_base::Buffer* raw_buffer) { - ASSERT(data_channel_type_ == cricket::DCT_SCTP && - was_ever_writable_ && - config_.id >= 0); - - talk_base::scoped_ptr buffer(raw_buffer); - - cricket::SendDataParams send_params; - send_params.ssrc = config_.id; - send_params.ordered = config_.ordered; - send_params.type = cricket::DMT_CONTROL; - - cricket::SendDataResult send_result; - bool retval = provider_->SendData(send_params, *buffer, &send_result); - if (retval) { - LOG(LS_INFO) << "Sent OPEN_ACK message on channel " << config_.id; - } else if (send_result == cricket::SDR_BLOCK) { - // Link is congested. Queue for later. - QueueControl(buffer.release()); - } else { - LOG(LS_ERROR) << "Failed to send OPEN_ACK message with result " - << send_result << " on channel " << config_.id; - } - return retval; -} - void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) { ASSERT(data_channel_type_ == cricket::DCT_RTP); @@ -262,6 +232,27 @@ void DataChannel::RemotePeerRequestClose() { DoClose(); } +void DataChannel::SetSctpSid(int sid) { + ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP); + if (config_.id == sid) + return; + + config_.id = sid; + provider_->AddSctpDataStream(sid); +} + +void DataChannel::OnTransportChannelCreated() { + ASSERT(data_channel_type_ == cricket::DCT_SCTP); + if (!connected_to_provider_) { + connected_to_provider_ = provider_->ConnectDataChannel(this); + } + // The sid may have been unassigned when provider_->ConnectDataChannel was + // done. So always add the streams even if connected_to_provider_ is true. + if (config_.id >= 0) { + provider_->AddSctpDataStream(config_.id); + } +} + void DataChannel::SetSendSsrc(uint32 send_ssrc) { ASSERT(data_channel_type_ == cricket::DCT_RTP); if (send_ssrc_set_) { @@ -330,12 +321,18 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel, if (was_ever_writable_ && observer_) { observer_->OnMessage(*buffer.get()); } else { - if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) { - LOG(LS_ERROR) - << "Queued received data exceeds the max number of packets."; - ClearQueuedReceivedData(); + if (queued_received_data_.byte_count() + payload.length() > + kMaxQueuedReceivedDataBytes) { + LOG(LS_ERROR) << "Queued received data exceeds the max buffer size."; + + queued_received_data_.Clear(); + if (data_channel_type_ != cricket::DCT_RTP) { + Close(); + } + + return; } - queued_received_data_.push(buffer.release()); + queued_received_data_.Push(buffer.release()); } } @@ -350,22 +347,27 @@ void DataChannel::OnChannelReady(bool writable) { was_ever_writable_ = true; if (data_channel_type_ == cricket::DCT_SCTP) { + talk_base::Buffer payload; + if (config_.open_handshake_role == InternalDataChannelInit::kOpener) { - talk_base::Buffer* payload = new talk_base::Buffer; - WriteDataChannelOpenMessage(label_, config_, payload); - SendOpenMessage(payload); + WriteDataChannelOpenMessage(label_, config_, &payload); + SendControlMessage(payload); } else if (config_.open_handshake_role == - InternalDataChannelInit::kAcker) { - talk_base::Buffer* payload = new talk_base::Buffer; - WriteDataChannelOpenAckMessage(payload); - SendOpenAckMessage(payload); + InternalDataChannelInit::kAcker) { + WriteDataChannelOpenAckMessage(&payload); + SendControlMessage(payload); } } UpdateState(); - ASSERT(queued_send_data_.empty()); + ASSERT(queued_send_data_.Empty()); } else if (state_ == kOpen) { - DeliverQueuedSendData(); + // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition + // that the readyState is open. According to the standard, the channel + // should not become open before the OPEN message is sent. + SendQueuedControlMessages(); + + SendQueuedDataMessages(); } } @@ -389,7 +391,7 @@ void DataChannel::UpdateState() { if (was_ever_writable_) { // TODO(jiayl): Do not transition to kOpen if we failed to send the // OPEN message. - DeliverQueuedControlData(); + SendQueuedControlMessages(); SetState(kOpen); // If we have received buffers before the channel got writable. // Deliver them now. @@ -441,75 +443,27 @@ void DataChannel::DeliverQueuedReceivedData() { return; } - while (!queued_received_data_.empty()) { - DataBuffer* buffer = queued_received_data_.front(); + while (!queued_received_data_.Empty()) { + talk_base::scoped_ptr buffer(queued_received_data_.Front()); observer_->OnMessage(*buffer); - queued_received_data_.pop(); - delete buffer; + queued_received_data_.Pop(); } } -void DataChannel::ClearQueuedReceivedData() { - while (!queued_received_data_.empty()) { - DataBuffer* buffer = queued_received_data_.front(); - queued_received_data_.pop(); - delete buffer; - } -} - -void DataChannel::DeliverQueuedSendData() { +void DataChannel::SendQueuedDataMessages() { ASSERT(was_ever_writable_ && state_ == kOpen); - // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition - // that the readyState is open. According to the standard, the channel should - // not become open before the OPEN message is sent. - DeliverQueuedControlData(); + PacketQueue packet_buffer; + packet_buffer.Swap(&queued_send_data_); - while (!queued_send_data_.empty()) { - DataBuffer* buffer = queued_send_data_.front(); - cricket::SendDataResult send_result; - if (!InternalSendWithoutQueueing(*buffer, &send_result)) { - LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result " - << send_result; - break; - } - queued_send_data_.pop_front(); - delete buffer; + while (!packet_buffer.Empty()) { + talk_base::scoped_ptr buffer(packet_buffer.Front()); + SendDataMessage(*buffer); + packet_buffer.Pop(); } } -void DataChannel::ClearQueuedControlData() { - while (!queued_control_data_.empty()) { - const talk_base::Buffer *buf = queued_control_data_.front(); - queued_control_data_.pop(); - delete buf; - } -} - -void DataChannel::DeliverQueuedControlData() { - ASSERT(was_ever_writable_); - while (!queued_control_data_.empty()) { - const talk_base::Buffer* buf = queued_control_data_.front(); - queued_control_data_.pop(); - if (config_.open_handshake_role == InternalDataChannelInit::kOpener) { - SendOpenMessage(buf); - } else { - ASSERT(config_.open_handshake_role == InternalDataChannelInit::kAcker); - SendOpenAckMessage(buf); - } - } -} - -void DataChannel::ClearQueuedSendData() { - while (!queued_send_data_.empty()) { - DataBuffer* buffer = queued_send_data_.front(); - queued_send_data_.pop_front(); - delete buffer; - } -} - -bool DataChannel::InternalSendWithoutQueueing( - const DataBuffer& buffer, cricket::SendDataResult* send_result) { +bool DataChannel::SendDataMessage(const DataBuffer& buffer) { cricket::SendDataParams send_params; if (data_channel_type_ == cricket::DCT_SCTP) { @@ -529,34 +483,78 @@ bool DataChannel::InternalSendWithoutQueueing( } send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT; - return provider_->SendData(send_params, buffer.data, send_result); + cricket::SendDataResult send_result = cricket::SDR_SUCCESS; + bool success = provider_->SendData(send_params, buffer.data, &send_result); + + if (!success && data_channel_type_ == cricket::DCT_SCTP) { + if (send_result != cricket::SDR_BLOCK || !QueueSendDataMessage(buffer)) { + LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, " + << "send_result = " << send_result; + Close(); + } + } + return success; } -bool DataChannel::QueueSendData(const DataBuffer& buffer) { - if (queued_send_data_.size() >= kMaxQueuedSendDataPackets) { +bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { + if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) { LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; return false; } - queued_send_data_.push_back(new DataBuffer(buffer)); + queued_send_data_.Push(new DataBuffer(buffer)); return true; } -void DataChannel::SetSctpSid(int sid) { - ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP); - config_.id = sid; - provider_->AddSctpDataStream(sid); +void DataChannel::SendQueuedControlMessages() { + ASSERT(was_ever_writable_); + + PacketQueue control_packets; + control_packets.Swap(&queued_control_data_); + + while (!control_packets.Empty()) { + talk_base::scoped_ptr buf(control_packets.Front()); + SendControlMessage(buf->data); + control_packets.Pop(); + } } -void DataChannel::OnTransportChannelCreated() { - ASSERT(data_channel_type_ == cricket::DCT_SCTP); - if (!connected_to_provider_) { - connected_to_provider_ = provider_->ConnectDataChannel(this); - } - // The sid may have been unassigned when provider_->ConnectDataChannel was - // done. So always add the streams even if connected_to_provider_ is true. - if (config_.id >= 0) { - provider_->AddSctpDataStream(config_.id); +void DataChannel::QueueControlMessage(const talk_base::Buffer& buffer) { + queued_control_data_.Push(new DataBuffer(buffer, true)); +} + +bool DataChannel::SendControlMessage(const talk_base::Buffer& buffer) { + bool is_open_message = + (config_.open_handshake_role == InternalDataChannelInit::kOpener); + + ASSERT(data_channel_type_ == cricket::DCT_SCTP && + was_ever_writable_ && + config_.id >= 0 && + (!is_open_message || !config_.negotiated)); + + cricket::SendDataParams send_params; + send_params.ssrc = config_.id; + send_params.ordered = config_.ordered || is_open_message; + send_params.type = cricket::DMT_CONTROL; + + cricket::SendDataResult send_result = cricket::SDR_SUCCESS; + bool retval = provider_->SendData(send_params, buffer, &send_result); + if (retval) { + LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id; + + if (is_open_message) { + // Send data as ordered before we receive any message from the remote peer + // to make sure the remote peer will not receive any data before it + // receives the OPEN message. + waiting_for_open_ack_ = true; + } + } else if (send_result == cricket::SDR_BLOCK) { + QueueControlMessage(buffer); + } else { + LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send" + << " the CONTROL message, send_result = " << send_result; + Close(); } + return retval; } } // namespace webrtc diff --git a/talk/app/webrtc/datachannel.h b/talk/app/webrtc/datachannel.h index 9256e0e07..0510f7e88 100644 --- a/talk/app/webrtc/datachannel.h +++ b/talk/app/webrtc/datachannel.h @@ -29,7 +29,7 @@ #define TALK_APP_WEBRTC_DATACHANNEL_H_ #include -#include +#include #include "talk/app/webrtc/datachannelinterface.h" #include "talk/app/webrtc/proxy.h" @@ -149,7 +149,8 @@ class DataChannel : public DataChannelInterface, // The following methods are for SCTP only. - // Sets the SCTP sid and adds to transport layer if not set yet. + // Sets the SCTP sid and adds to transport layer if not set yet. Should only + // be called once. void SetSctpSid(int sid); // Called when the transport channel is created. void OnTransportChannelCreated(); @@ -175,23 +176,49 @@ class DataChannel : public DataChannelInterface, virtual ~DataChannel(); private: + // A packet queue which tracks the total queued bytes. Queued packets are + // owned by this class. + class PacketQueue { + public: + PacketQueue(); + ~PacketQueue(); + + size_t byte_count() const { + return byte_count_; + } + + bool Empty() const; + + DataBuffer* Front(); + + void Pop(); + + void Push(DataBuffer* packet); + + void Clear(); + + void Swap(PacketQueue* other); + + private: + std::deque packets_; + size_t byte_count_; + }; + bool Init(const InternalDataChannelInit& config); void DoClose(); void UpdateState(); void SetState(DataState state); void DisconnectFromTransport(); - void DeliverQueuedControlData(); - void QueueControl(const talk_base::Buffer* buffer); - void ClearQueuedControlData(); + void DeliverQueuedReceivedData(); - void ClearQueuedReceivedData(); - void DeliverQueuedSendData(); - void ClearQueuedSendData(); - bool InternalSendWithoutQueueing(const DataBuffer& buffer, - cricket::SendDataResult* send_result); - bool QueueSendData(const DataBuffer& buffer); - bool SendOpenMessage(const talk_base::Buffer* buffer); - bool SendOpenAckMessage(const talk_base::Buffer* buffer); + + void SendQueuedDataMessages(); + bool SendDataMessage(const DataBuffer& buffer); + bool QueueSendDataMessage(const DataBuffer& buffer); + + void SendQueuedControlMessages(); + void QueueControlMessage(const talk_base::Buffer& buffer); + bool SendControlMessage(const talk_base::Buffer& buffer); std::string label_; InternalDataChannelInit config_; @@ -208,9 +235,9 @@ class DataChannel : public DataChannelInterface, uint32 receive_ssrc_; // Control messages that always have to get sent out before any queued // data. - std::queue queued_control_data_; - std::queue queued_received_data_; - std::deque queued_send_data_; + PacketQueue queued_control_data_; + PacketQueue queued_received_data_; + PacketQueue queued_send_data_; }; class DataChannelFactory { diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc index 991ae0cae..6f223fe42 100644 --- a/talk/app/webrtc/datachannel_unittest.cc +++ b/talk/app/webrtc/datachannel_unittest.cc @@ -174,6 +174,16 @@ TEST_F(SctpDataChannelTest, OpenMessageSent) { static_cast(webrtc_data_channel_->id())); } +TEST_F(SctpDataChannelTest, QueuedOpenMessageSent) { + provider_.set_send_blocked(true); + SetChannelReady(); + provider_.set_send_blocked(false); + + EXPECT_EQ(cricket::DMT_CONTROL, provider_.last_send_data_params().type); + EXPECT_EQ(provider_.last_send_data_params().ssrc, + static_cast(webrtc_data_channel_->id())); +} + // Tests that the DataChannel created after transport gets ready can enter OPEN // state. TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) { @@ -330,11 +340,17 @@ TEST_F(SctpDataChannelTest, OpenAckRoleInitialization) { // Tests that the DataChannel is closed if the sending buffer is full. TEST_F(SctpDataChannelTest, ClosedWhenSendBufferFull) { SetChannelReady(); - webrtc::DataBuffer buffer("abcd"); + + const size_t buffer_size = 1024; + talk_base::Buffer buffer; + buffer.SetLength(buffer_size); + memset(buffer.data(), 0, buffer_size); + + webrtc::DataBuffer packet(buffer, true); provider_.set_send_blocked(true); - for (size_t i = 0; i < 101; ++i) { - EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); + for (size_t i = 0; i < 16 * 1024 + 1; ++i) { + EXPECT_TRUE(webrtc_data_channel_->Send(packet)); } EXPECT_EQ(webrtc::DataChannelInterface::kClosed, @@ -376,3 +392,21 @@ TEST_F(SctpDataChannelTest, RemotePeerRequestClose) { webrtc_data_channel_->state()); } +// Tests that the DataChannel is closed if the received buffer is full. +TEST_F(SctpDataChannelTest, ClosedWhenReceivedBufferFull) { + SetChannelReady(); + const size_t buffer_size = 1024; + talk_base::Buffer buffer; + buffer.SetLength(buffer_size); + memset(buffer.data(), 0, buffer_size); + + cricket::ReceiveDataParams params; + params.ssrc = 0; + + // Receiving data without having an observer will overflow the buffer. + for (size_t i = 0; i < 16 * 1024 + 1; ++i) { + webrtc_data_channel_->OnDataReceived(NULL, params, buffer); + } + EXPECT_EQ(webrtc::DataChannelInterface::kClosed, + webrtc_data_channel_->state()); +}