Support for onbufferedamountlow

Original review at https://webrtc-codereview.appspot.com/54679004/

BUG=https://code.google.com/p/chromium/issues/detail?id=496700

Review URL: https://codereview.webrtc.org/1207613006

Cr-Commit-Position: refs/heads/master@{#9527}
This commit is contained in:
bemasc
2015-07-01 13:34:33 -07:00
committed by Commit bot
parent 545727ecce
commit 0edd50ccb3
10 changed files with 87 additions and 9 deletions

View File

@@ -476,6 +476,7 @@ void DataChannel::SendQueuedDataMessages() {
ASSERT(state_ == kOpen || state_ == kClosing); ASSERT(state_ == kOpen || state_ == kClosing);
uint64 start_buffered_amount = buffered_amount();
while (!queued_send_data_.Empty()) { while (!queued_send_data_.Empty()) {
DataBuffer* buffer = queued_send_data_.Front(); DataBuffer* buffer = queued_send_data_.Front();
if (!SendDataMessage(*buffer, false)) { if (!SendDataMessage(*buffer, false)) {
@@ -485,6 +486,10 @@ void DataChannel::SendQueuedDataMessages() {
queued_send_data_.Pop(); queued_send_data_.Pop();
delete buffer; delete buffer;
} }
if (observer_ && buffered_amount() < start_buffered_amount) {
observer_->OnBufferedAmountChange(start_buffered_amount);
}
} }
bool DataChannel::SendDataMessage(const DataBuffer& buffer, bool DataChannel::SendDataMessage(const DataBuffer& buffer,
@@ -534,11 +539,17 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer,
} }
bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) { size_t start_buffered_amount = buffered_amount();
if (start_buffered_amount >= kMaxQueuedSendDataBytes) {
LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
return false; return false;
} }
queued_send_data_.Push(new DataBuffer(buffer)); queued_send_data_.Push(new DataBuffer(buffer));
// The buffer can have length zero, in which case there is no change.
if (observer_ && buffered_amount() > start_buffered_amount) {
observer_->OnBufferedAmountChange(start_buffered_amount);
}
return true; return true;
} }

View File

@@ -35,12 +35,18 @@ using webrtc::DataChannel;
class FakeDataChannelObserver : public webrtc::DataChannelObserver { class FakeDataChannelObserver : public webrtc::DataChannelObserver {
public: public:
FakeDataChannelObserver() FakeDataChannelObserver()
: messages_received_(0), on_state_change_count_(0) {} : messages_received_(0),
on_state_change_count_(0),
on_buffered_amount_change_count_(0) {}
void OnStateChange() { void OnStateChange() {
++on_state_change_count_; ++on_state_change_count_;
} }
void OnBufferedAmountChange(uint64 previous_amount) {
++on_buffered_amount_change_count_;
}
void OnMessage(const webrtc::DataBuffer& buffer) { void OnMessage(const webrtc::DataBuffer& buffer) {
++messages_received_; ++messages_received_;
} }
@@ -53,13 +59,22 @@ class FakeDataChannelObserver : public webrtc::DataChannelObserver {
on_state_change_count_ = 0; on_state_change_count_ = 0;
} }
void ResetOnBufferedAmountChangeCount() {
on_buffered_amount_change_count_ = 0;
}
size_t on_state_change_count() const { size_t on_state_change_count() const {
return on_state_change_count_; return on_state_change_count_;
} }
size_t on_buffered_amount_change_count() const {
return on_buffered_amount_change_count_;
}
private: private:
size_t messages_received_; size_t messages_received_;
size_t on_state_change_count_; size_t on_state_change_count_;
size_t on_buffered_amount_change_count_;
}; };
class SctpDataChannelTest : public testing::Test { class SctpDataChannelTest : public testing::Test {
@@ -133,11 +148,13 @@ TEST_F(SctpDataChannelTest, StateTransition) {
// Tests that DataChannel::buffered_amount() is correct after the channel is // Tests that DataChannel::buffered_amount() is correct after the channel is
// blocked. // blocked.
TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
AddObserver();
SetChannelReady(); SetChannelReady();
webrtc::DataBuffer buffer("abcd"); webrtc::DataBuffer buffer("abcd");
EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
provider_.set_send_blocked(true); provider_.set_send_blocked(true);
@@ -147,37 +164,46 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
} }
EXPECT_EQ(buffer.data.size() * number_of_packets, EXPECT_EQ(buffer.data.size() * number_of_packets,
webrtc_data_channel_->buffered_amount()); webrtc_data_channel_->buffered_amount());
EXPECT_EQ(number_of_packets, observer_->on_buffered_amount_change_count());
} }
// Tests that the queued data are sent when the channel transitions from blocked // Tests that the queued data are sent when the channel transitions from blocked
// to unblocked. // to unblocked.
TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) { TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) {
AddObserver();
SetChannelReady(); SetChannelReady();
webrtc::DataBuffer buffer("abcd"); webrtc::DataBuffer buffer("abcd");
provider_.set_send_blocked(true); provider_.set_send_blocked(true);
EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
provider_.set_send_blocked(false); provider_.set_send_blocked(false);
SetChannelReady(); SetChannelReady();
EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
EXPECT_EQ(2U, observer_->on_buffered_amount_change_count());
} }
// Tests that no crash when the channel is blocked right away while trying to // Tests that no crash when the channel is blocked right away while trying to
// send queued data. // send queued data.
TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) { TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) {
AddObserver();
SetChannelReady(); SetChannelReady();
webrtc::DataBuffer buffer("abcd"); webrtc::DataBuffer buffer("abcd");
provider_.set_send_blocked(true); provider_.set_send_blocked(true);
EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
// Set channel ready while it is still blocked. // Set channel ready while it is still blocked.
SetChannelReady(); SetChannelReady();
EXPECT_EQ(buffer.size(), webrtc_data_channel_->buffered_amount()); EXPECT_EQ(buffer.size(), webrtc_data_channel_->buffered_amount());
EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
// Unblock the channel to send queued data again, there should be no crash. // Unblock the channel to send queued data again, there should be no crash.
provider_.set_send_blocked(false); provider_.set_send_blocked(false);
SetChannelReady(); SetChannelReady();
EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
EXPECT_EQ(2U, observer_->on_buffered_amount_change_count());
} }
// Tests that the queued control message is sent when channel is ready. // Tests that the queued control message is sent when channel is ready.

View File

@@ -91,6 +91,8 @@ class DataChannelObserver {
virtual void OnStateChange() = 0; virtual void OnStateChange() = 0;
// A data buffer was successfully received. // A data buffer was successfully received.
virtual void OnMessage(const DataBuffer& buffer) = 0; virtual void OnMessage(const DataBuffer& buffer) = 0;
// The data channel's buffered_amount has changed.
virtual void OnBufferedAmountChange(uint64 previous_amount){};
protected: protected:
virtual ~DataChannelObserver() {} virtual ~DataChannelObserver() {}

View File

@@ -559,16 +559,24 @@ class DataChannelObserverWrapper : public DataChannelObserver {
: j_observer_global_(jni, j_observer), : j_observer_global_(jni, j_observer),
j_observer_class_(jni, GetObjectClass(jni, j_observer)), j_observer_class_(jni, GetObjectClass(jni, j_observer)),
j_buffer_class_(jni, FindClass(jni, "org/webrtc/DataChannel$Buffer")), j_buffer_class_(jni, FindClass(jni, "org/webrtc/DataChannel$Buffer")),
j_on_state_change_mid_(GetMethodID(jni, *j_observer_class_, j_on_buffered_amount_change_mid_(GetMethodID(
"onStateChange", "()V")), jni, *j_observer_class_, "onBufferedAmountChange", "(J)V")),
j_on_state_change_mid_(
GetMethodID(jni, *j_observer_class_, "onStateChange", "()V")),
j_on_message_mid_(GetMethodID(jni, *j_observer_class_, "onMessage", j_on_message_mid_(GetMethodID(jni, *j_observer_class_, "onMessage",
"(Lorg/webrtc/DataChannel$Buffer;)V")), "(Lorg/webrtc/DataChannel$Buffer;)V")),
j_buffer_ctor_(GetMethodID(jni, *j_buffer_class_, j_buffer_ctor_(GetMethodID(jni, *j_buffer_class_, "<init>",
"<init>", "(Ljava/nio/ByteBuffer;Z)V")) { "(Ljava/nio/ByteBuffer;Z)V")) {}
}
virtual ~DataChannelObserverWrapper() {} virtual ~DataChannelObserverWrapper() {}
void OnBufferedAmountChange(uint64 previous_amount) override {
ScopedLocalRefFrame local_ref_frame(jni());
jni()->CallVoidMethod(*j_observer_global_, j_on_buffered_amount_change_mid_,
previous_amount);
CHECK_EXCEPTION(jni()) << "error during CallVoidMethod";
}
void OnStateChange() override { void OnStateChange() override {
ScopedLocalRefFrame local_ref_frame(jni()); ScopedLocalRefFrame local_ref_frame(jni());
jni()->CallVoidMethod(*j_observer_global_, j_on_state_change_mid_); jni()->CallVoidMethod(*j_observer_global_, j_on_state_change_mid_);
@@ -593,6 +601,7 @@ class DataChannelObserverWrapper : public DataChannelObserver {
const ScopedGlobalRef<jobject> j_observer_global_; const ScopedGlobalRef<jobject> j_observer_global_;
const ScopedGlobalRef<jclass> j_observer_class_; const ScopedGlobalRef<jclass> j_observer_class_;
const ScopedGlobalRef<jclass> j_buffer_class_; const ScopedGlobalRef<jclass> j_buffer_class_;
const jmethodID j_on_buffered_amount_change_mid_;
const jmethodID j_on_state_change_mid_; const jmethodID j_on_state_change_mid_;
const jmethodID j_on_message_mid_; const jmethodID j_on_message_mid_;
const jmethodID j_buffer_ctor_; const jmethodID j_buffer_ctor_;

View File

@@ -77,6 +77,8 @@ public class DataChannel {
/** Java version of C++ DataChannelObserver. */ /** Java version of C++ DataChannelObserver. */
public interface Observer { public interface Observer {
/** The data channel's bufferedAmount has changed. */
public void onBufferedAmountChange(long previousAmount);
/** The data channel state has changed. */ /** The data channel state has changed. */
public void onStateChange(); public void onStateChange();
/** /**

View File

@@ -258,6 +258,11 @@ public class PeerConnectionTest {
assertTrue(expected.data.equals(buffer.data)); assertTrue(expected.data.equals(buffer.data));
} }
@Override
public synchronized void onBufferedAmountChange(long previousAmount) {
assertFalse(previousAmount == dataChannel.bufferedAmount());
}
@Override @Override
public synchronized void onStateChange() { public synchronized void onStateChange() {
assertEquals(expectedStateChanges.removeFirst(), dataChannel.state()); assertEquals(expectedStateChanges.removeFirst(), dataChannel.state());

View File

@@ -43,6 +43,15 @@ class RTCDataChannelObserver : public DataChannelObserver {
[_channel.delegate channelDidChangeState:_channel]; [_channel.delegate channelDidChangeState:_channel];
} }
void OnBufferedAmountChange(uint64 previousAmount) override {
RTCDataChannel* channel = _channel;
id<RTCDataChannelDelegate> delegate = channel.delegate;
if ([delegate
respondsToSelector:@selector(channel:didChangeBufferedAmount:)]) {
[delegate channel:channel didChangeBufferedAmount:previousAmount];
}
}
void OnMessage(const DataBuffer& buffer) override { void OnMessage(const DataBuffer& buffer) override {
if (!_channel.delegate) { if (!_channel.delegate) {
return; return;

View File

@@ -82,6 +82,12 @@ typedef enum {
- (void)channel:(RTCDataChannel*)channel - (void)channel:(RTCDataChannel*)channel
didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer; didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer;
@optional
// Called when the buffered amount has changed.
- (void)channel:(RTCDataChannel*)channel
didChangeBufferedAmount:(NSUInteger)amount;
@end @end
// ObjectiveC wrapper for a DataChannel object. // ObjectiveC wrapper for a DataChannel object.

View File

@@ -230,6 +230,12 @@
NSAssert(expectedState == channel.state, @"Channel state should match"); NSAssert(expectedState == channel.state, @"Channel state should match");
} }
- (void)channel:(RTCDataChannel*)channel
didChangeBufferedAmount:(NSUInteger)previousAmount {
NSAssert(channel.bufferedAmount != previousAmount,
@"Invalid bufferedAmount change");
}
- (void)channel:(RTCDataChannel*)channel - (void)channel:(RTCDataChannel*)channel
didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer { didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer {
NSAssert([_expectedMessages count] > 0, NSAssert([_expectedMessages count] > 0,

View File

@@ -98,8 +98,10 @@ class MockDataChannelObserver : public webrtc::DataChannelObserver {
channel_->UnregisterObserver(); channel_->UnregisterObserver();
} }
virtual void OnStateChange() { state_ = channel_->state(); } void OnBufferedAmountChange(uint64 previous_amount) override {}
virtual void OnMessage(const DataBuffer& buffer) {
void OnStateChange() override { state_ = channel_->state(); }
void OnMessage(const DataBuffer& buffer) override {
last_message_.assign(buffer.data.data<char>(), buffer.data.size()); last_message_.assign(buffer.data.data<char>(), buffer.data.size());
++received_message_count_; ++received_message_count_;
} }