diff --git a/webrtc/base/BUILD.gn b/webrtc/base/BUILD.gn index 116e4e9a2..f4c663f9c 100644 --- a/webrtc/base/BUILD.gn +++ b/webrtc/base/BUILD.gn @@ -110,6 +110,8 @@ static_library("rtc_base_approved") { "bitbuffer.h", "buffer.cc", "buffer.h", + "bufferqueue.cc", + "bufferqueue.h", "bytebuffer.cc", "bytebuffer.h", "byteorder.h", diff --git a/webrtc/base/base.gyp b/webrtc/base/base.gyp index 9992d6dfb..b81e48d5d 100644 --- a/webrtc/base/base.gyp +++ b/webrtc/base/base.gyp @@ -37,6 +37,8 @@ 'bitbuffer.h', 'buffer.cc', 'buffer.h', + 'bufferqueue.cc', + 'bufferqueue.h', 'bytebuffer.cc', 'bytebuffer.h', 'byteorder.h', diff --git a/webrtc/base/base_tests.gyp b/webrtc/base/base_tests.gyp index 3c500e03c..bff20dbaa 100644 --- a/webrtc/base/base_tests.gyp +++ b/webrtc/base/base_tests.gyp @@ -55,6 +55,7 @@ 'bind_unittest.cc', 'bitbuffer_unittest.cc', 'buffer_unittest.cc', + 'bufferqueue_unittest.cc', 'bytebuffer_unittest.cc', 'byteorder_unittest.cc', 'callback_unittest.cc', diff --git a/webrtc/base/bufferqueue.cc b/webrtc/base/bufferqueue.cc new file mode 100644 index 000000000..955af51f6 --- /dev/null +++ b/webrtc/base/bufferqueue.cc @@ -0,0 +1,80 @@ +/* + * Copyright 2015 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/base/bufferqueue.h" + +namespace rtc { + +BufferQueue::BufferQueue(size_t capacity, size_t default_size) + : capacity_(capacity), default_size_(default_size) { +} + +BufferQueue::~BufferQueue() { + CritScope cs(&crit_); + + for (Buffer* buffer : queue_) { + delete buffer; + } + for (Buffer* buffer : free_list_) { + delete buffer; + } +} + +size_t BufferQueue::size() const { + CritScope cs(&crit_); + return queue_.size(); +} + +bool BufferQueue::ReadFront(void* buffer, size_t bytes, size_t* bytes_read) { + CritScope cs(&crit_); + if (queue_.empty()) { + return false; + } + + Buffer* packet = queue_.front(); + queue_.pop_front(); + + size_t next_packet_size = packet->size(); + if (bytes > next_packet_size) { + bytes = next_packet_size; + } + + memcpy(buffer, packet->data(), bytes); + if (bytes_read) { + *bytes_read = bytes; + } + free_list_.push_back(packet); + return true; +} + +bool BufferQueue::WriteBack(const void* buffer, size_t bytes, + size_t* bytes_written) { + CritScope cs(&crit_); + if (queue_.size() == capacity_) { + return false; + } + + Buffer* packet; + if (!free_list_.empty()) { + packet = free_list_.back(); + free_list_.pop_back(); + } else { + packet = new Buffer(bytes, default_size_); + } + + packet->SetData(static_cast(buffer), bytes); + if (bytes_written) { + *bytes_written = bytes; + } + queue_.push_back(packet); + return true; +} + +} // namespace rtc diff --git a/webrtc/base/bufferqueue.h b/webrtc/base/bufferqueue.h new file mode 100644 index 000000000..4adae416a --- /dev/null +++ b/webrtc/base/bufferqueue.h @@ -0,0 +1,50 @@ +/* + * Copyright 2015 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_BASE_BUFFERQUEUE_H_ +#define WEBRTC_BASE_BUFFERQUEUE_H_ + +#include +#include + +#include "webrtc/base/buffer.h" +#include "webrtc/base/criticalsection.h" + +namespace rtc { + +class BufferQueue { + public: + // Creates a buffer queue queue with a given capacity and default buffer size. + BufferQueue(size_t capacity, size_t default_size); + ~BufferQueue(); + + // Return number of queued buffers. + size_t size() const; + + // ReadFront will only read one buffer at a time and will truncate buffers + // that don't fit in the passed memory. + bool ReadFront(void* data, size_t bytes, size_t* bytes_read); + + // WriteBack always writes either the complete memory or nothing. + bool WriteBack(const void* data, size_t bytes, size_t* bytes_written); + + private: + size_t capacity_; + size_t default_size_; + std::deque queue_; + std::vector free_list_; + mutable CriticalSection crit_; // object lock + + DISALLOW_COPY_AND_ASSIGN(BufferQueue); +}; + +} // namespace rtc + +#endif // WEBRTC_BASE_BUFFERQUEUE_H_ diff --git a/webrtc/base/bufferqueue_unittest.cc b/webrtc/base/bufferqueue_unittest.cc new file mode 100644 index 000000000..07084c4a6 --- /dev/null +++ b/webrtc/base/bufferqueue_unittest.cc @@ -0,0 +1,86 @@ +/* + * Copyright 2015 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/base/bufferqueue.h" +#include "webrtc/base/gunit.h" + +namespace rtc { + +TEST(BufferQueueTest, TestAll) { + const size_t kSize = 16; + const char in[kSize * 2 + 1] = "0123456789ABCDEFGHIJKLMNOPQRSTUV"; + char out[kSize * 2]; + size_t bytes; + BufferQueue queue1(1, kSize); + BufferQueue queue2(2, kSize); + + // The queue is initially empty. + EXPECT_EQ(0u, queue1.size()); + EXPECT_FALSE(queue1.ReadFront(out, kSize, &bytes)); + + // A write should succeed. + EXPECT_TRUE(queue1.WriteBack(in, kSize, &bytes)); + EXPECT_EQ(kSize, bytes); + EXPECT_EQ(1u, queue1.size()); + + // The queue is full now (only one buffer allowed). + EXPECT_FALSE(queue1.WriteBack(in, kSize, &bytes)); + EXPECT_EQ(1u, queue1.size()); + + // Reading previously written buffer. + EXPECT_TRUE(queue1.ReadFront(out, kSize, &bytes)); + EXPECT_EQ(kSize, bytes); + EXPECT_EQ(0, memcmp(in, out, kSize)); + + // The queue is empty again now. + EXPECT_FALSE(queue1.ReadFront(out, kSize, &bytes)); + EXPECT_EQ(0u, queue1.size()); + + // Reading only returns available data. + EXPECT_TRUE(queue1.WriteBack(in, kSize, &bytes)); + EXPECT_EQ(kSize, bytes); + EXPECT_EQ(1u, queue1.size()); + EXPECT_TRUE(queue1.ReadFront(out, kSize * 2, &bytes)); + EXPECT_EQ(kSize, bytes); + EXPECT_EQ(0, memcmp(in, out, kSize)); + EXPECT_EQ(0u, queue1.size()); + + // Reading maintains buffer boundaries. + EXPECT_TRUE(queue2.WriteBack(in, kSize / 2, &bytes)); + EXPECT_EQ(1u, queue2.size()); + EXPECT_TRUE(queue2.WriteBack(in + kSize / 2, kSize / 2, &bytes)); + EXPECT_EQ(2u, queue2.size()); + EXPECT_TRUE(queue2.ReadFront(out, kSize, &bytes)); + EXPECT_EQ(kSize / 2, bytes); + EXPECT_EQ(0, memcmp(in, out, kSize / 2)); + EXPECT_EQ(1u, queue2.size()); + EXPECT_TRUE(queue2.ReadFront(out, kSize, &bytes)); + EXPECT_EQ(kSize / 2, bytes); + EXPECT_EQ(0, memcmp(in + kSize / 2, out, kSize / 2)); + EXPECT_EQ(0u, queue2.size()); + + // Reading truncates buffers. + EXPECT_TRUE(queue2.WriteBack(in, kSize / 2, &bytes)); + EXPECT_EQ(1u, queue2.size()); + EXPECT_TRUE(queue2.WriteBack(in + kSize / 2, kSize / 2, &bytes)); + EXPECT_EQ(2u, queue2.size()); + // Read first packet partially in too-small buffer. + EXPECT_TRUE(queue2.ReadFront(out, kSize / 4, &bytes)); + EXPECT_EQ(kSize / 4, bytes); + EXPECT_EQ(0, memcmp(in, out, kSize / 4)); + EXPECT_EQ(1u, queue2.size()); + // Remainder of first packet is truncated, reading starts with next packet. + EXPECT_TRUE(queue2.ReadFront(out, kSize, &bytes)); + EXPECT_EQ(kSize / 2, bytes); + EXPECT_EQ(0, memcmp(in + kSize / 2, out, kSize / 2)); + EXPECT_EQ(0u, queue2.size()); +} + +} // namespace rtc diff --git a/webrtc/p2p/base/dtlstransportchannel.cc b/webrtc/p2p/base/dtlstransportchannel.cc index cb1575dfe..6a206d420 100644 --- a/webrtc/p2p/base/dtlstransportchannel.cc +++ b/webrtc/p2p/base/dtlstransportchannel.cc @@ -12,6 +12,7 @@ #include "webrtc/p2p/base/common.h" #include "webrtc/base/buffer.h" +#include "webrtc/base/checks.h" #include "webrtc/base/dscp.h" #include "webrtc/base/messagequeue.h" #include "webrtc/base/sslstreamadapter.h" @@ -25,6 +26,10 @@ static const size_t kDtlsRecordHeaderLen = 13; static const size_t kMaxDtlsPacketLen = 2048; static const size_t kMinRtpPacketLen = 12; +// Maximum number of pending packets in the queue. Packets are read immediately +// after they have been written, so a capacity of "1" is sufficient. +static const size_t kMaxPendingPackets = 1; + static bool IsDtlsPacket(const char* data, size_t len) { const uint8* u = reinterpret_cast(data); return (len >= kDtlsRecordHeaderLen && (u[0] > 19 && u[0] < 64)); @@ -34,6 +39,12 @@ static bool IsRtpPacket(const char* data, size_t len) { return (len >= kMinRtpPacketLen && (u[0] & 0xC0) == 0x80); } +StreamInterfaceChannel::StreamInterfaceChannel(TransportChannel* channel) + : channel_(channel), + state_(rtc::SS_OPEN), + packets_(kMaxPendingPackets, kMaxDtlsPacketLen) { +} + rtc::StreamResult StreamInterfaceChannel::Read(void* buffer, size_t buffer_len, size_t* read, @@ -43,7 +54,11 @@ rtc::StreamResult StreamInterfaceChannel::Read(void* buffer, if (state_ == rtc::SS_OPENING) return rtc::SR_BLOCK; - return fifo_.Read(buffer, buffer_len, read, error); + if (!packets_.ReadFront(buffer, buffer_len, read)) { + return rtc::SR_BLOCK; + } + + return rtc::SR_SUCCESS; } rtc::StreamResult StreamInterfaceChannel::Write(const void* data, @@ -62,21 +77,15 @@ rtc::StreamResult StreamInterfaceChannel::Write(const void* data, } bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) { - // We force a read event here to ensure that we don't overflow our FIFO. - // Under high packet rate this can occur if we wait for the FIFO to post its - // own SE_READ. - bool ret = (fifo_.WriteAll(data, size, NULL, NULL) == rtc::SR_SUCCESS); + // We force a read event here to ensure that we don't overflow our queue. + bool ret = packets_.WriteBack(data, size, NULL); + CHECK(ret) << "Failed to write packet to queue."; if (ret) { SignalEvent(this, rtc::SE_READ, 0); } return ret; } -void StreamInterfaceChannel::OnEvent(rtc::StreamInterface* stream, - int sig, int err) { - SignalEvent(this, sig, err); -} - DtlsTransportChannelWrapper::DtlsTransportChannelWrapper( Transport* transport, TransportChannelImpl* channel) @@ -242,8 +251,7 @@ bool DtlsTransportChannelWrapper::GetRemoteCertificate( } bool DtlsTransportChannelWrapper::SetupDtls() { - StreamInterfaceChannel* downward = - new StreamInterfaceChannel(worker_thread_, channel_); + StreamInterfaceChannel* downward = new StreamInterfaceChannel(channel_); dtls_.reset(rtc::SSLStreamAdapter::Create(downward)); if (!dtls_) { diff --git a/webrtc/p2p/base/dtlstransportchannel.h b/webrtc/p2p/base/dtlstransportchannel.h index 5469b88ec..a48d09fec 100644 --- a/webrtc/p2p/base/dtlstransportchannel.h +++ b/webrtc/p2p/base/dtlstransportchannel.h @@ -16,6 +16,7 @@ #include "webrtc/p2p/base/transportchannelimpl.h" #include "webrtc/base/buffer.h" +#include "webrtc/base/bufferqueue.h" #include "webrtc/base/scoped_ptr.h" #include "webrtc/base/sslstreamadapter.h" #include "webrtc/base/stream.h" @@ -24,15 +25,9 @@ namespace cricket { // A bridge between a packet-oriented/channel-type interface on // the bottom and a StreamInterface on the top. -class StreamInterfaceChannel : public rtc::StreamInterface, - public sigslot::has_slots<> { +class StreamInterfaceChannel : public rtc::StreamInterface { public: - StreamInterfaceChannel(rtc::Thread* owner, TransportChannel* channel) - : channel_(channel), - state_(rtc::SS_OPEN), - fifo_(kFifoSize, owner) { - fifo_.SignalEvent.connect(this, &StreamInterfaceChannel::OnEvent); - } + StreamInterfaceChannel(TransportChannel* channel); // Push in a packet; this gets pulled out from Read(). bool OnPacketReceived(const char* data, size_t size); @@ -46,14 +41,9 @@ class StreamInterfaceChannel : public rtc::StreamInterface, size_t* written, int* error); private: - static const size_t kFifoSize = 8192; - - // Forward events - virtual void OnEvent(rtc::StreamInterface* stream, int sig, int err); - TransportChannel* channel_; // owned by DtlsTransportChannelWrapper rtc::StreamState state_; - rtc::FifoBuffer fifo_; + rtc::BufferQueue packets_; DISALLOW_COPY_AND_ASSIGN(StreamInterfaceChannel); };