Limits the send and receive buffer by bytes, not by packets.
The new limit is 16MB for each buffer. Also refactors the code to handle send failure more consistently. BUG=3429 R=juberti@webrtc.org Review URL: https://webrtc-codereview.appspot.com/21559005 git-svn-id: http://webrtc.googlecode.com/svn/trunk@6511 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
parent
db397e5c6c
commit
b43c99de29
@ -35,13 +35,57 @@
|
|||||||
|
|
||||||
namespace webrtc {
|
namespace webrtc {
|
||||||
|
|
||||||
static size_t kMaxQueuedReceivedDataPackets = 100;
|
static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
|
||||||
static size_t kMaxQueuedSendDataPackets = 100;
|
static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
MSG_CHANNELREADY,
|
MSG_CHANNELREADY,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
DataChannel::PacketQueue::PacketQueue() : byte_count_(0) {}
|
||||||
|
|
||||||
|
DataChannel::PacketQueue::~PacketQueue() {
|
||||||
|
Clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool DataChannel::PacketQueue::Empty() const {
|
||||||
|
return packets_.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
DataBuffer* DataChannel::PacketQueue::Front() {
|
||||||
|
return packets_.front();
|
||||||
|
}
|
||||||
|
|
||||||
|
void DataChannel::PacketQueue::Pop() {
|
||||||
|
if (packets_.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte_count_ -= packets_.front()->size();
|
||||||
|
packets_.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
|
void DataChannel::PacketQueue::Push(DataBuffer* packet) {
|
||||||
|
byte_count_ += packet->size();
|
||||||
|
packets_.push_back(packet);
|
||||||
|
}
|
||||||
|
|
||||||
|
void DataChannel::PacketQueue::Clear() {
|
||||||
|
while (!packets_.empty()) {
|
||||||
|
delete packets_.front();
|
||||||
|
packets_.pop_front();
|
||||||
|
}
|
||||||
|
byte_count_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DataChannel::PacketQueue::Swap(PacketQueue* other) {
|
||||||
|
size_t other_byte_count = other->byte_count_;
|
||||||
|
other->byte_count_ = byte_count_;
|
||||||
|
byte_count_ = other_byte_count;
|
||||||
|
|
||||||
|
other->packets_.swap(packets_);
|
||||||
|
}
|
||||||
|
|
||||||
talk_base::scoped_refptr<DataChannel> DataChannel::Create(
|
talk_base::scoped_refptr<DataChannel> DataChannel::Create(
|
||||||
DataChannelProviderInterface* provider,
|
DataChannelProviderInterface* provider,
|
||||||
cricket::DataChannelType dct,
|
cricket::DataChannelType dct,
|
||||||
@ -114,11 +158,7 @@ bool DataChannel::Init(const InternalDataChannelInit& config) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
DataChannel::~DataChannel() {
|
DataChannel::~DataChannel() {}
|
||||||
ClearQueuedReceivedData();
|
|
||||||
ClearQueuedSendData();
|
|
||||||
ClearQueuedControlData();
|
|
||||||
}
|
|
||||||
|
|
||||||
void DataChannel::RegisterObserver(DataChannelObserver* observer) {
|
void DataChannel::RegisterObserver(DataChannelObserver* observer) {
|
||||||
observer_ = observer;
|
observer_ = observer;
|
||||||
@ -139,13 +179,7 @@ bool DataChannel::reliable() const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
uint64 DataChannel::buffered_amount() const {
|
uint64 DataChannel::buffered_amount() const {
|
||||||
uint64 buffered_amount = 0;
|
return queued_send_data_.byte_count();
|
||||||
for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
|
|
||||||
it != queued_send_data_.end();
|
|
||||||
++it) {
|
|
||||||
buffered_amount += (*it)->size();
|
|
||||||
}
|
|
||||||
return buffered_amount;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void DataChannel::Close() {
|
void DataChannel::Close() {
|
||||||
@ -163,89 +197,25 @@ bool DataChannel::Send(const DataBuffer& buffer) {
|
|||||||
}
|
}
|
||||||
// If the queue is non-empty, we're waiting for SignalReadyToSend,
|
// If the queue is non-empty, we're waiting for SignalReadyToSend,
|
||||||
// so just add to the end of the queue and keep waiting.
|
// so just add to the end of the queue and keep waiting.
|
||||||
if (!queued_send_data_.empty()) {
|
if (!queued_send_data_.Empty()) {
|
||||||
if (!QueueSendData(buffer)) {
|
// Only SCTP DataChannel queues the outgoing data when the transport is
|
||||||
if (data_channel_type_ == cricket::DCT_RTP) {
|
// blocked.
|
||||||
return false;
|
ASSERT(data_channel_type_ == cricket::DCT_SCTP);
|
||||||
}
|
if (!QueueSendDataMessage(buffer)) {
|
||||||
Close();
|
Close();
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
cricket::SendDataResult send_result;
|
bool success = SendDataMessage(buffer);
|
||||||
if (!InternalSendWithoutQueueing(buffer, &send_result)) {
|
|
||||||
if (data_channel_type_ == cricket::DCT_RTP) {
|
if (data_channel_type_ == cricket::DCT_RTP) {
|
||||||
return false;
|
return success;
|
||||||
}
|
|
||||||
if (send_result != cricket::SDR_BLOCK || !QueueSendData(buffer)) {
|
|
||||||
Close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Always return true for SCTP DataChannel per the spec.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
|
|
||||||
queued_control_data_.push(buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) {
|
|
||||||
ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
|
|
||||||
was_ever_writable_ &&
|
|
||||||
config_.id >= 0 &&
|
|
||||||
!config_.negotiated);
|
|
||||||
|
|
||||||
talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
|
|
||||||
|
|
||||||
cricket::SendDataParams send_params;
|
|
||||||
send_params.ssrc = config_.id;
|
|
||||||
send_params.ordered = true;
|
|
||||||
send_params.type = cricket::DMT_CONTROL;
|
|
||||||
|
|
||||||
cricket::SendDataResult send_result;
|
|
||||||
bool retval = provider_->SendData(send_params, *buffer, &send_result);
|
|
||||||
if (retval) {
|
|
||||||
LOG(LS_INFO) << "Sent OPEN message on channel " << config_.id;
|
|
||||||
// Send data as ordered before we receive any mesage from the remote peer
|
|
||||||
// to make sure the remote peer will not receive any data before it receives
|
|
||||||
// the OPEN message.
|
|
||||||
waiting_for_open_ack_ = true;
|
|
||||||
} else if (send_result == cricket::SDR_BLOCK) {
|
|
||||||
// Link is congested. Queue for later.
|
|
||||||
QueueControl(buffer.release());
|
|
||||||
} else {
|
|
||||||
LOG(LS_ERROR) << "Failed to send OPEN message with result "
|
|
||||||
<< send_result << " on channel " << config_.id;
|
|
||||||
}
|
|
||||||
return retval;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool DataChannel::SendOpenAckMessage(const talk_base::Buffer* raw_buffer) {
|
|
||||||
ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
|
|
||||||
was_ever_writable_ &&
|
|
||||||
config_.id >= 0);
|
|
||||||
|
|
||||||
talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
|
|
||||||
|
|
||||||
cricket::SendDataParams send_params;
|
|
||||||
send_params.ssrc = config_.id;
|
|
||||||
send_params.ordered = config_.ordered;
|
|
||||||
send_params.type = cricket::DMT_CONTROL;
|
|
||||||
|
|
||||||
cricket::SendDataResult send_result;
|
|
||||||
bool retval = provider_->SendData(send_params, *buffer, &send_result);
|
|
||||||
if (retval) {
|
|
||||||
LOG(LS_INFO) << "Sent OPEN_ACK message on channel " << config_.id;
|
|
||||||
} else if (send_result == cricket::SDR_BLOCK) {
|
|
||||||
// Link is congested. Queue for later.
|
|
||||||
QueueControl(buffer.release());
|
|
||||||
} else {
|
|
||||||
LOG(LS_ERROR) << "Failed to send OPEN_ACK message with result "
|
|
||||||
<< send_result << " on channel " << config_.id;
|
|
||||||
}
|
|
||||||
return retval;
|
|
||||||
}
|
|
||||||
|
|
||||||
void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
|
void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
|
||||||
ASSERT(data_channel_type_ == cricket::DCT_RTP);
|
ASSERT(data_channel_type_ == cricket::DCT_RTP);
|
||||||
|
|
||||||
@ -262,6 +232,27 @@ void DataChannel::RemotePeerRequestClose() {
|
|||||||
DoClose();
|
DoClose();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DataChannel::SetSctpSid(int sid) {
|
||||||
|
ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
|
||||||
|
if (config_.id == sid)
|
||||||
|
return;
|
||||||
|
|
||||||
|
config_.id = sid;
|
||||||
|
provider_->AddSctpDataStream(sid);
|
||||||
|
}
|
||||||
|
|
||||||
|
void DataChannel::OnTransportChannelCreated() {
|
||||||
|
ASSERT(data_channel_type_ == cricket::DCT_SCTP);
|
||||||
|
if (!connected_to_provider_) {
|
||||||
|
connected_to_provider_ = provider_->ConnectDataChannel(this);
|
||||||
|
}
|
||||||
|
// The sid may have been unassigned when provider_->ConnectDataChannel was
|
||||||
|
// done. So always add the streams even if connected_to_provider_ is true.
|
||||||
|
if (config_.id >= 0) {
|
||||||
|
provider_->AddSctpDataStream(config_.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void DataChannel::SetSendSsrc(uint32 send_ssrc) {
|
void DataChannel::SetSendSsrc(uint32 send_ssrc) {
|
||||||
ASSERT(data_channel_type_ == cricket::DCT_RTP);
|
ASSERT(data_channel_type_ == cricket::DCT_RTP);
|
||||||
if (send_ssrc_set_) {
|
if (send_ssrc_set_) {
|
||||||
@ -330,12 +321,18 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel,
|
|||||||
if (was_ever_writable_ && observer_) {
|
if (was_ever_writable_ && observer_) {
|
||||||
observer_->OnMessage(*buffer.get());
|
observer_->OnMessage(*buffer.get());
|
||||||
} else {
|
} else {
|
||||||
if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
|
if (queued_received_data_.byte_count() + payload.length() >
|
||||||
LOG(LS_ERROR)
|
kMaxQueuedReceivedDataBytes) {
|
||||||
<< "Queued received data exceeds the max number of packets.";
|
LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
|
||||||
ClearQueuedReceivedData();
|
|
||||||
|
queued_received_data_.Clear();
|
||||||
|
if (data_channel_type_ != cricket::DCT_RTP) {
|
||||||
|
Close();
|
||||||
}
|
}
|
||||||
queued_received_data_.push(buffer.release());
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
queued_received_data_.Push(buffer.release());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -350,22 +347,27 @@ void DataChannel::OnChannelReady(bool writable) {
|
|||||||
was_ever_writable_ = true;
|
was_ever_writable_ = true;
|
||||||
|
|
||||||
if (data_channel_type_ == cricket::DCT_SCTP) {
|
if (data_channel_type_ == cricket::DCT_SCTP) {
|
||||||
|
talk_base::Buffer payload;
|
||||||
|
|
||||||
if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
|
if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
|
||||||
talk_base::Buffer* payload = new talk_base::Buffer;
|
WriteDataChannelOpenMessage(label_, config_, &payload);
|
||||||
WriteDataChannelOpenMessage(label_, config_, payload);
|
SendControlMessage(payload);
|
||||||
SendOpenMessage(payload);
|
|
||||||
} else if (config_.open_handshake_role ==
|
} else if (config_.open_handshake_role ==
|
||||||
InternalDataChannelInit::kAcker) {
|
InternalDataChannelInit::kAcker) {
|
||||||
talk_base::Buffer* payload = new talk_base::Buffer;
|
WriteDataChannelOpenAckMessage(&payload);
|
||||||
WriteDataChannelOpenAckMessage(payload);
|
SendControlMessage(payload);
|
||||||
SendOpenAckMessage(payload);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateState();
|
UpdateState();
|
||||||
ASSERT(queued_send_data_.empty());
|
ASSERT(queued_send_data_.Empty());
|
||||||
} else if (state_ == kOpen) {
|
} else if (state_ == kOpen) {
|
||||||
DeliverQueuedSendData();
|
// TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
|
||||||
|
// that the readyState is open. According to the standard, the channel
|
||||||
|
// should not become open before the OPEN message is sent.
|
||||||
|
SendQueuedControlMessages();
|
||||||
|
|
||||||
|
SendQueuedDataMessages();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -389,7 +391,7 @@ void DataChannel::UpdateState() {
|
|||||||
if (was_ever_writable_) {
|
if (was_ever_writable_) {
|
||||||
// TODO(jiayl): Do not transition to kOpen if we failed to send the
|
// TODO(jiayl): Do not transition to kOpen if we failed to send the
|
||||||
// OPEN message.
|
// OPEN message.
|
||||||
DeliverQueuedControlData();
|
SendQueuedControlMessages();
|
||||||
SetState(kOpen);
|
SetState(kOpen);
|
||||||
// If we have received buffers before the channel got writable.
|
// If we have received buffers before the channel got writable.
|
||||||
// Deliver them now.
|
// Deliver them now.
|
||||||
@ -441,75 +443,27 @@ void DataChannel::DeliverQueuedReceivedData() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!queued_received_data_.empty()) {
|
while (!queued_received_data_.Empty()) {
|
||||||
DataBuffer* buffer = queued_received_data_.front();
|
talk_base::scoped_ptr<DataBuffer> buffer(queued_received_data_.Front());
|
||||||
observer_->OnMessage(*buffer);
|
observer_->OnMessage(*buffer);
|
||||||
queued_received_data_.pop();
|
queued_received_data_.Pop();
|
||||||
delete buffer;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DataChannel::ClearQueuedReceivedData() {
|
void DataChannel::SendQueuedDataMessages() {
|
||||||
while (!queued_received_data_.empty()) {
|
|
||||||
DataBuffer* buffer = queued_received_data_.front();
|
|
||||||
queued_received_data_.pop();
|
|
||||||
delete buffer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void DataChannel::DeliverQueuedSendData() {
|
|
||||||
ASSERT(was_ever_writable_ && state_ == kOpen);
|
ASSERT(was_ever_writable_ && state_ == kOpen);
|
||||||
|
|
||||||
// TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
|
PacketQueue packet_buffer;
|
||||||
// that the readyState is open. According to the standard, the channel should
|
packet_buffer.Swap(&queued_send_data_);
|
||||||
// not become open before the OPEN message is sent.
|
|
||||||
DeliverQueuedControlData();
|
|
||||||
|
|
||||||
while (!queued_send_data_.empty()) {
|
while (!packet_buffer.Empty()) {
|
||||||
DataBuffer* buffer = queued_send_data_.front();
|
talk_base::scoped_ptr<DataBuffer> buffer(packet_buffer.Front());
|
||||||
cricket::SendDataResult send_result;
|
SendDataMessage(*buffer);
|
||||||
if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
|
packet_buffer.Pop();
|
||||||
LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result "
|
|
||||||
<< send_result;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
queued_send_data_.pop_front();
|
|
||||||
delete buffer;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DataChannel::ClearQueuedControlData() {
|
bool DataChannel::SendDataMessage(const DataBuffer& buffer) {
|
||||||
while (!queued_control_data_.empty()) {
|
|
||||||
const talk_base::Buffer *buf = queued_control_data_.front();
|
|
||||||
queued_control_data_.pop();
|
|
||||||
delete buf;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void DataChannel::DeliverQueuedControlData() {
|
|
||||||
ASSERT(was_ever_writable_);
|
|
||||||
while (!queued_control_data_.empty()) {
|
|
||||||
const talk_base::Buffer* buf = queued_control_data_.front();
|
|
||||||
queued_control_data_.pop();
|
|
||||||
if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
|
|
||||||
SendOpenMessage(buf);
|
|
||||||
} else {
|
|
||||||
ASSERT(config_.open_handshake_role == InternalDataChannelInit::kAcker);
|
|
||||||
SendOpenAckMessage(buf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void DataChannel::ClearQueuedSendData() {
|
|
||||||
while (!queued_send_data_.empty()) {
|
|
||||||
DataBuffer* buffer = queued_send_data_.front();
|
|
||||||
queued_send_data_.pop_front();
|
|
||||||
delete buffer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool DataChannel::InternalSendWithoutQueueing(
|
|
||||||
const DataBuffer& buffer, cricket::SendDataResult* send_result) {
|
|
||||||
cricket::SendDataParams send_params;
|
cricket::SendDataParams send_params;
|
||||||
|
|
||||||
if (data_channel_type_ == cricket::DCT_SCTP) {
|
if (data_channel_type_ == cricket::DCT_SCTP) {
|
||||||
@ -529,34 +483,78 @@ bool DataChannel::InternalSendWithoutQueueing(
|
|||||||
}
|
}
|
||||||
send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
|
send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
|
||||||
|
|
||||||
return provider_->SendData(send_params, buffer.data, send_result);
|
cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
|
||||||
|
bool success = provider_->SendData(send_params, buffer.data, &send_result);
|
||||||
|
|
||||||
|
if (!success && data_channel_type_ == cricket::DCT_SCTP) {
|
||||||
|
if (send_result != cricket::SDR_BLOCK || !QueueSendDataMessage(buffer)) {
|
||||||
|
LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
|
||||||
|
<< "send_result = " << send_result;
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DataChannel::QueueSendData(const DataBuffer& buffer) {
|
bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
|
||||||
if (queued_send_data_.size() >= kMaxQueuedSendDataPackets) {
|
if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) {
|
||||||
LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
|
LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
queued_send_data_.push_back(new DataBuffer(buffer));
|
queued_send_data_.Push(new DataBuffer(buffer));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DataChannel::SetSctpSid(int sid) {
|
void DataChannel::SendQueuedControlMessages() {
|
||||||
ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
|
ASSERT(was_ever_writable_);
|
||||||
config_.id = sid;
|
|
||||||
provider_->AddSctpDataStream(sid);
|
PacketQueue control_packets;
|
||||||
|
control_packets.Swap(&queued_control_data_);
|
||||||
|
|
||||||
|
while (!control_packets.Empty()) {
|
||||||
|
talk_base::scoped_ptr<DataBuffer> buf(control_packets.Front());
|
||||||
|
SendControlMessage(buf->data);
|
||||||
|
control_packets.Pop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DataChannel::OnTransportChannelCreated() {
|
void DataChannel::QueueControlMessage(const talk_base::Buffer& buffer) {
|
||||||
ASSERT(data_channel_type_ == cricket::DCT_SCTP);
|
queued_control_data_.Push(new DataBuffer(buffer, true));
|
||||||
if (!connected_to_provider_) {
|
}
|
||||||
connected_to_provider_ = provider_->ConnectDataChannel(this);
|
|
||||||
|
bool DataChannel::SendControlMessage(const talk_base::Buffer& buffer) {
|
||||||
|
bool is_open_message =
|
||||||
|
(config_.open_handshake_role == InternalDataChannelInit::kOpener);
|
||||||
|
|
||||||
|
ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
|
||||||
|
was_ever_writable_ &&
|
||||||
|
config_.id >= 0 &&
|
||||||
|
(!is_open_message || !config_.negotiated));
|
||||||
|
|
||||||
|
cricket::SendDataParams send_params;
|
||||||
|
send_params.ssrc = config_.id;
|
||||||
|
send_params.ordered = config_.ordered || is_open_message;
|
||||||
|
send_params.type = cricket::DMT_CONTROL;
|
||||||
|
|
||||||
|
cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
|
||||||
|
bool retval = provider_->SendData(send_params, buffer, &send_result);
|
||||||
|
if (retval) {
|
||||||
|
LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id;
|
||||||
|
|
||||||
|
if (is_open_message) {
|
||||||
|
// Send data as ordered before we receive any message from the remote peer
|
||||||
|
// to make sure the remote peer will not receive any data before it
|
||||||
|
// receives the OPEN message.
|
||||||
|
waiting_for_open_ack_ = true;
|
||||||
}
|
}
|
||||||
// The sid may have been unassigned when provider_->ConnectDataChannel was
|
} else if (send_result == cricket::SDR_BLOCK) {
|
||||||
// done. So always add the streams even if connected_to_provider_ is true.
|
QueueControlMessage(buffer);
|
||||||
if (config_.id >= 0) {
|
} else {
|
||||||
provider_->AddSctpDataStream(config_.id);
|
LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
|
||||||
|
<< " the CONTROL message, send_result = " << send_result;
|
||||||
|
Close();
|
||||||
}
|
}
|
||||||
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace webrtc
|
} // namespace webrtc
|
||||||
|
@ -29,7 +29,7 @@
|
|||||||
#define TALK_APP_WEBRTC_DATACHANNEL_H_
|
#define TALK_APP_WEBRTC_DATACHANNEL_H_
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <queue>
|
#include <deque>
|
||||||
|
|
||||||
#include "talk/app/webrtc/datachannelinterface.h"
|
#include "talk/app/webrtc/datachannelinterface.h"
|
||||||
#include "talk/app/webrtc/proxy.h"
|
#include "talk/app/webrtc/proxy.h"
|
||||||
@ -149,7 +149,8 @@ class DataChannel : public DataChannelInterface,
|
|||||||
|
|
||||||
// The following methods are for SCTP only.
|
// The following methods are for SCTP only.
|
||||||
|
|
||||||
// Sets the SCTP sid and adds to transport layer if not set yet.
|
// Sets the SCTP sid and adds to transport layer if not set yet. Should only
|
||||||
|
// be called once.
|
||||||
void SetSctpSid(int sid);
|
void SetSctpSid(int sid);
|
||||||
// Called when the transport channel is created.
|
// Called when the transport channel is created.
|
||||||
void OnTransportChannelCreated();
|
void OnTransportChannelCreated();
|
||||||
@ -175,23 +176,49 @@ class DataChannel : public DataChannelInterface,
|
|||||||
virtual ~DataChannel();
|
virtual ~DataChannel();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
// A packet queue which tracks the total queued bytes. Queued packets are
|
||||||
|
// owned by this class.
|
||||||
|
class PacketQueue {
|
||||||
|
public:
|
||||||
|
PacketQueue();
|
||||||
|
~PacketQueue();
|
||||||
|
|
||||||
|
size_t byte_count() const {
|
||||||
|
return byte_count_;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Empty() const;
|
||||||
|
|
||||||
|
DataBuffer* Front();
|
||||||
|
|
||||||
|
void Pop();
|
||||||
|
|
||||||
|
void Push(DataBuffer* packet);
|
||||||
|
|
||||||
|
void Clear();
|
||||||
|
|
||||||
|
void Swap(PacketQueue* other);
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::deque<DataBuffer*> packets_;
|
||||||
|
size_t byte_count_;
|
||||||
|
};
|
||||||
|
|
||||||
bool Init(const InternalDataChannelInit& config);
|
bool Init(const InternalDataChannelInit& config);
|
||||||
void DoClose();
|
void DoClose();
|
||||||
void UpdateState();
|
void UpdateState();
|
||||||
void SetState(DataState state);
|
void SetState(DataState state);
|
||||||
void DisconnectFromTransport();
|
void DisconnectFromTransport();
|
||||||
void DeliverQueuedControlData();
|
|
||||||
void QueueControl(const talk_base::Buffer* buffer);
|
|
||||||
void ClearQueuedControlData();
|
|
||||||
void DeliverQueuedReceivedData();
|
void DeliverQueuedReceivedData();
|
||||||
void ClearQueuedReceivedData();
|
|
||||||
void DeliverQueuedSendData();
|
void SendQueuedDataMessages();
|
||||||
void ClearQueuedSendData();
|
bool SendDataMessage(const DataBuffer& buffer);
|
||||||
bool InternalSendWithoutQueueing(const DataBuffer& buffer,
|
bool QueueSendDataMessage(const DataBuffer& buffer);
|
||||||
cricket::SendDataResult* send_result);
|
|
||||||
bool QueueSendData(const DataBuffer& buffer);
|
void SendQueuedControlMessages();
|
||||||
bool SendOpenMessage(const talk_base::Buffer* buffer);
|
void QueueControlMessage(const talk_base::Buffer& buffer);
|
||||||
bool SendOpenAckMessage(const talk_base::Buffer* buffer);
|
bool SendControlMessage(const talk_base::Buffer& buffer);
|
||||||
|
|
||||||
std::string label_;
|
std::string label_;
|
||||||
InternalDataChannelInit config_;
|
InternalDataChannelInit config_;
|
||||||
@ -208,9 +235,9 @@ class DataChannel : public DataChannelInterface,
|
|||||||
uint32 receive_ssrc_;
|
uint32 receive_ssrc_;
|
||||||
// Control messages that always have to get sent out before any queued
|
// Control messages that always have to get sent out before any queued
|
||||||
// data.
|
// data.
|
||||||
std::queue<const talk_base::Buffer*> queued_control_data_;
|
PacketQueue queued_control_data_;
|
||||||
std::queue<DataBuffer*> queued_received_data_;
|
PacketQueue queued_received_data_;
|
||||||
std::deque<DataBuffer*> queued_send_data_;
|
PacketQueue queued_send_data_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class DataChannelFactory {
|
class DataChannelFactory {
|
||||||
|
@ -174,6 +174,16 @@ TEST_F(SctpDataChannelTest, OpenMessageSent) {
|
|||||||
static_cast<uint32>(webrtc_data_channel_->id()));
|
static_cast<uint32>(webrtc_data_channel_->id()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(SctpDataChannelTest, QueuedOpenMessageSent) {
|
||||||
|
provider_.set_send_blocked(true);
|
||||||
|
SetChannelReady();
|
||||||
|
provider_.set_send_blocked(false);
|
||||||
|
|
||||||
|
EXPECT_EQ(cricket::DMT_CONTROL, provider_.last_send_data_params().type);
|
||||||
|
EXPECT_EQ(provider_.last_send_data_params().ssrc,
|
||||||
|
static_cast<uint32>(webrtc_data_channel_->id()));
|
||||||
|
}
|
||||||
|
|
||||||
// Tests that the DataChannel created after transport gets ready can enter OPEN
|
// Tests that the DataChannel created after transport gets ready can enter OPEN
|
||||||
// state.
|
// state.
|
||||||
TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) {
|
TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) {
|
||||||
@ -330,11 +340,17 @@ TEST_F(SctpDataChannelTest, OpenAckRoleInitialization) {
|
|||||||
// Tests that the DataChannel is closed if the sending buffer is full.
|
// Tests that the DataChannel is closed if the sending buffer is full.
|
||||||
TEST_F(SctpDataChannelTest, ClosedWhenSendBufferFull) {
|
TEST_F(SctpDataChannelTest, ClosedWhenSendBufferFull) {
|
||||||
SetChannelReady();
|
SetChannelReady();
|
||||||
webrtc::DataBuffer buffer("abcd");
|
|
||||||
|
const size_t buffer_size = 1024;
|
||||||
|
talk_base::Buffer buffer;
|
||||||
|
buffer.SetLength(buffer_size);
|
||||||
|
memset(buffer.data(), 0, buffer_size);
|
||||||
|
|
||||||
|
webrtc::DataBuffer packet(buffer, true);
|
||||||
provider_.set_send_blocked(true);
|
provider_.set_send_blocked(true);
|
||||||
|
|
||||||
for (size_t i = 0; i < 101; ++i) {
|
for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
|
||||||
EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
|
EXPECT_TRUE(webrtc_data_channel_->Send(packet));
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
|
EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
|
||||||
@ -376,3 +392,21 @@ TEST_F(SctpDataChannelTest, RemotePeerRequestClose) {
|
|||||||
webrtc_data_channel_->state());
|
webrtc_data_channel_->state());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that the DataChannel is closed if the received buffer is full.
|
||||||
|
TEST_F(SctpDataChannelTest, ClosedWhenReceivedBufferFull) {
|
||||||
|
SetChannelReady();
|
||||||
|
const size_t buffer_size = 1024;
|
||||||
|
talk_base::Buffer buffer;
|
||||||
|
buffer.SetLength(buffer_size);
|
||||||
|
memset(buffer.data(), 0, buffer_size);
|
||||||
|
|
||||||
|
cricket::ReceiveDataParams params;
|
||||||
|
params.ssrc = 0;
|
||||||
|
|
||||||
|
// Receiving data without having an observer will overflow the buffer.
|
||||||
|
for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
|
||||||
|
webrtc_data_channel_->OnDataReceived(NULL, params, buffer);
|
||||||
|
}
|
||||||
|
EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
|
||||||
|
webrtc_data_channel_->state());
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user