Implements TODOs for webrtc::datachannel state management when the SCTP association is congested. Adds missing state variables for each step in the transitions between DataChannelInterface::DataStates (kConnecting, kOpen, etc.), and uses them.

BUG=https://code.google.com/p/chromium/issues/detail?id=474650
R=jiayl@webrtc.org, pthatcher@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/44299004

Cr-Commit-Position: refs/heads/master@{#9331}
This commit is contained in:
Lally Singh 2015-05-29 11:52:39 -04:00
parent c28a896a7b
commit 5c6c6e026b
4 changed files with 140 additions and 78 deletions

View File

@ -109,24 +109,26 @@ DataChannel::DataChannel(
state_(kConnecting),
data_channel_type_(dct),
provider_(provider),
waiting_for_open_ack_(false),
was_ever_writable_(false),
handshake_state_(kHandshakeInit),
connected_to_provider_(false),
send_ssrc_set_(false),
receive_ssrc_set_(false),
writable_(false),
send_ssrc_(0),
receive_ssrc_(0) {
}
bool DataChannel::Init(const InternalDataChannelInit& config) {
if (data_channel_type_ == cricket::DCT_RTP &&
(config.reliable ||
if (data_channel_type_ == cricket::DCT_RTP) {
if (config.reliable ||
config.id != -1 ||
config.maxRetransmits != -1 ||
config.maxRetransmitTime != -1)) {
config.maxRetransmitTime != -1) {
LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
<< "invalid DataChannelInit.";
return false;
}
handshake_state_ = kHandshakeReady;
} else if (data_channel_type_ == cricket::DCT_SCTP) {
if (config.id < -1 ||
config.maxRetransmits < -1 ||
@ -142,6 +144,18 @@ bool DataChannel::Init(const InternalDataChannelInit& config) {
}
config_ = config;
switch (config_.open_handshake_role) {
case webrtc::InternalDataChannelInit::kNone: // pre-negotiated
handshake_state_ = kHandshakeReady;
break;
case webrtc::InternalDataChannelInit::kOpener:
handshake_state_ = kHandshakeShouldSendOpen;
break;
case webrtc::InternalDataChannelInit::kAcker:
handshake_state_ = kHandshakeShouldSendAck;
break;
};
// Try to connect to the transport in case the transport channel already
// exists.
OnTransportChannelCreated();
@ -298,7 +312,7 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel,
if (params.type == cricket::DMT_CONTROL) {
ASSERT(data_channel_type_ == cricket::DCT_SCTP);
if (!waiting_for_open_ack_) {
if (handshake_state_ != kHandshakeWaitingForAck) {
// Ignore it if we are not expecting an ACK message.
LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, "
<< "sid = " << params.ssrc;
@ -306,7 +320,7 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel,
}
if (ParseDataChannelOpenAckMessage(payload)) {
// We can send unordered as soon as we receive the ACK message.
waiting_for_open_ack_ = false;
handshake_state_ = kHandshakeReady;
LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
<< params.ssrc;
} else {
@ -323,11 +337,13 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel,
// We can send unordered as soon as we receive any DATA message since the
// remote side must have received the OPEN (and old clients do not send
// OPEN_ACK).
waiting_for_open_ack_ = false;
if (handshake_state_ == kHandshakeWaitingForAck) {
handshake_state_ = kHandshakeReady;
}
bool binary = (params.type == cricket::DMT_BINARY);
rtc::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
if (was_ever_writable_ && observer_) {
if (state_ == kOpen && observer_) {
observer_->OnMessage(*buffer.get());
} else {
if (queued_received_data_.byte_count() + payload.size() >
@ -346,38 +362,14 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel,
}
void DataChannel::OnChannelReady(bool writable) {
writable_ = writable;
if (!writable) {
return;
}
// Update the readyState and send the queued control message if the channel
// is writable for the first time; otherwise it means the channel was blocked
// for sending and now unblocked, so send the queued data now.
if (!was_ever_writable_) {
was_ever_writable_ = true;
if (data_channel_type_ == cricket::DCT_SCTP) {
rtc::Buffer payload;
if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
WriteDataChannelOpenMessage(label_, config_, &payload);
SendControlMessage(payload);
} else if (config_.open_handshake_role ==
InternalDataChannelInit::kAcker) {
WriteDataChannelOpenAckMessage(&payload);
SendControlMessage(payload);
}
}
UpdateState();
ASSERT(queued_send_data_.Empty());
} else if (state_ == kOpen) {
// TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
// that the readyState is open. According to the standard, the channel
// should not become open before the OPEN message is sent.
SendQueuedControlMessages();
SendQueuedDataMessages();
}
UpdateState();
}
void DataChannel::DoClose() {
@ -391,33 +383,51 @@ void DataChannel::DoClose() {
}
void DataChannel::UpdateState() {
// UpdateState determines what to do from a few state variables. Include
// all conditions required for each state transition here for
// clarity. OnChannelReady(true) will send any queued data and then invoke
// UpdateState().
switch (state_) {
case kConnecting: {
if (send_ssrc_set_ == receive_ssrc_set_) {
if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
connected_to_provider_ = provider_->ConnectDataChannel(this);
}
if (was_ever_writable_) {
// TODO(jiayl): Do not transition to kOpen if we failed to send the
// OPEN message.
SendQueuedControlMessages();
if (connected_to_provider_) {
if (handshake_state_ == kHandshakeShouldSendOpen) {
rtc::Buffer payload;
WriteDataChannelOpenMessage(label_, config_, &payload);
SendControlMessage(payload);
} else if (handshake_state_ == kHandshakeShouldSendAck) {
rtc::Buffer payload;
WriteDataChannelOpenAckMessage(&payload);
SendControlMessage(payload);
}
if (writable_ &&
(handshake_state_ == kHandshakeReady ||
handshake_state_ == kHandshakeWaitingForAck)) {
SetState(kOpen);
// If we have received buffers before the channel got writable.
// Deliver them now.
DeliverQueuedReceivedData();
}
}
}
break;
}
case kOpen: {
break;
}
case kClosing: {
DisconnectFromTransport();
if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
if (connected_to_provider_) {
DisconnectFromProvider();
}
if (!send_ssrc_set_ && !receive_ssrc_set_) {
if (!connected_to_provider_ && !send_ssrc_set_ && !receive_ssrc_set_) {
SetState(kClosed);
}
}
break;
}
case kClosed:
@ -435,7 +445,7 @@ void DataChannel::SetState(DataState state) {
}
}
void DataChannel::DisconnectFromTransport() {
void DataChannel::DisconnectFromProvider() {
if (!connected_to_provider_)
return;
@ -448,7 +458,7 @@ void DataChannel::DisconnectFromTransport() {
}
void DataChannel::DeliverQueuedReceivedData() {
if (!was_ever_writable_ || !observer_) {
if (!observer_) {
return;
}
@ -460,7 +470,11 @@ void DataChannel::DeliverQueuedReceivedData() {
}
void DataChannel::SendQueuedDataMessages() {
ASSERT(was_ever_writable_ && state_ == kOpen);
if (queued_send_data_.Empty()) {
return;
}
ASSERT(state_ == kOpen || state_ == kClosing);
while (!queued_send_data_.Empty()) {
DataBuffer* buffer = queued_send_data_.Front();
@ -479,8 +493,8 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer,
if (data_channel_type_ == cricket::DCT_SCTP) {
send_params.ordered = config_.ordered;
// Send as ordered if it is waiting for the OPEN_ACK message.
if (waiting_for_open_ack_ && !config_.ordered) {
// Send as ordered if it is still going through OPEN/ACK signaling.
if (handshake_state_ != kHandshakeReady && !config_.ordered) {
send_params.ordered = true;
LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel "
<< "because the OPEN_ACK message has not been received.";
@ -529,8 +543,6 @@ bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
}
void DataChannel::SendQueuedControlMessages() {
ASSERT(was_ever_writable_);
PacketQueue control_packets;
control_packets.Swap(&queued_control_data_);
@ -546,16 +558,18 @@ void DataChannel::QueueControlMessage(const rtc::Buffer& buffer) {
}
bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) {
bool is_open_message =
(config_.open_handshake_role == InternalDataChannelInit::kOpener);
bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
was_ever_writable_ &&
writable_ &&
config_.id >= 0 &&
(!is_open_message || !config_.negotiated));
cricket::SendDataParams send_params;
send_params.ssrc = config_.id;
// Send data as ordered before we receive any message from the remote peer to
// make sure the remote peer will not receive any data before it receives the
// OPEN message.
send_params.ordered = config_.ordered || is_open_message;
send_params.type = cricket::DMT_CONTROL;
@ -564,11 +578,10 @@ bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) {
if (retval) {
LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id;
if (is_open_message) {
// Send data as ordered before we receive any message from the remote peer
// to make sure the remote peer will not receive any data before it
// receives the OPEN message.
waiting_for_open_ack_ = true;
if (handshake_state_ == kHandshakeShouldSendAck) {
handshake_state_ = kHandshakeReady;
} else if (handshake_state_ == kHandshakeShouldSendOpen) {
handshake_state_ = kHandshakeWaitingForAck;
}
} else if (send_result == cricket::SDR_BLOCK) {
QueueControlMessage(buffer);

View File

@ -204,11 +204,20 @@ class DataChannel : public DataChannelInterface,
size_t byte_count_;
};
// The OPEN(_ACK) signaling state.
enum HandshakeState {
kHandshakeInit,
kHandshakeShouldSendOpen,
kHandshakeShouldSendAck,
kHandshakeWaitingForAck,
kHandshakeReady
};
bool Init(const InternalDataChannelInit& config);
void DoClose();
void UpdateState();
void SetState(DataState state);
void DisconnectFromTransport();
void DisconnectFromProvider();
void DeliverQueuedReceivedData();
@ -226,11 +235,11 @@ class DataChannel : public DataChannelInterface,
DataState state_;
cricket::DataChannelType data_channel_type_;
DataChannelProviderInterface* provider_;
bool waiting_for_open_ack_;
bool was_ever_writable_;
HandshakeState handshake_state_;
bool connected_to_provider_;
bool send_ssrc_set_;
bool receive_ssrc_set_;
bool writable_;
uint32 send_ssrc_;
uint32 receive_ssrc_;
// Control messages that always have to get sent out before any queued

View File

@ -269,6 +269,41 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) {
EXPECT_FALSE(provider_.last_send_data_params().ordered);
}
// Tests that the channel can't open until it's successfully sent the OPEN
// message.
TEST_F(SctpDataChannelTest, OpenWaitsForOpenMesssage) {
webrtc::DataBuffer buffer("foo");
provider_.set_send_blocked(true);
SetChannelReady();
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting,
webrtc_data_channel_->state());
provider_.set_send_blocked(false);
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen,
webrtc_data_channel_->state(), 1000);
EXPECT_EQ(cricket::DMT_CONTROL, provider_.last_send_data_params().type);
}
// Tests that close first makes sure all queued data gets sent.
TEST_F(SctpDataChannelTest, QueuedCloseFlushes) {
webrtc::DataBuffer buffer("foo");
provider_.set_send_blocked(true);
SetChannelReady();
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting,
webrtc_data_channel_->state());
provider_.set_send_blocked(false);
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen,
webrtc_data_channel_->state(), 1000);
provider_.set_send_blocked(true);
webrtc_data_channel_->Send(buffer);
webrtc_data_channel_->Close();
provider_.set_send_blocked(false);
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed,
webrtc_data_channel_->state(), 1000);
EXPECT_EQ(cricket::DMT_TEXT, provider_.last_send_data_params().type);
}
// Tests that messages are sent with the right ssrc.
TEST_F(SctpDataChannelTest, SendDataSsrc) {
webrtc_data_channel_->SetSctpSid(1);
@ -369,8 +404,9 @@ TEST_F(SctpDataChannelTest, ClosedWhenSendBufferFull) {
EXPECT_TRUE(webrtc_data_channel_->Send(packet));
}
EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
webrtc_data_channel_->state());
EXPECT_TRUE(
webrtc::DataChannelInterface::kClosed == webrtc_data_channel_->state() ||
webrtc::DataChannelInterface::kClosing == webrtc_data_channel_->state());
}
// Tests that the DataChannel is closed on transport errors.

View File

@ -91,11 +91,15 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface {
void set_send_blocked(bool blocked) {
send_blocked_ = blocked;
if (!blocked) {
std::set<webrtc::DataChannel*>::iterator it;
for (it = connected_channels_.begin();
it != connected_channels_.end();
++it) {
(*it)->OnChannelReady(true);
// Take a snapshot of the connected channels and check to see whether
// each value is still in connected_channels_ before calling
// OnChannelReady(). This avoids problems where the set gets modified
// in response to OnChannelReady().
for (webrtc::DataChannel *ch : std::set<webrtc::DataChannel*>(
connected_channels_.begin(), connected_channels_.end())) {
if (connected_channels_.count(ch)) {
ch->OnChannelReady(true);
}
}
}
}