diff --git a/talk/app/webrtc/mediastreamsignaling.cc b/talk/app/webrtc/mediastreamsignaling.cc index 371c28aaa..a23799f73 100644 --- a/talk/app/webrtc/mediastreamsignaling.cc +++ b/talk/app/webrtc/mediastreamsignaling.cc @@ -37,6 +37,7 @@ #include "talk/app/webrtc/videosource.h" #include "talk/app/webrtc/videotrack.h" #include "talk/base/bytebuffer.h" +#include "talk/media/sctp/sctpdataengine.h" static const char kDefaultStreamLabel[] = "default"; static const char kDefaultAudioTrackLabel[] = "defaulta0"; diff --git a/talk/app/webrtc/peerconnectioninterface_unittest.cc b/talk/app/webrtc/peerconnectioninterface_unittest.cc index 9fbfefc61..7aa06ef42 100644 --- a/talk/app/webrtc/peerconnectioninterface_unittest.cc +++ b/talk/app/webrtc/peerconnectioninterface_unittest.cc @@ -41,6 +41,7 @@ #include "talk/base/stringutils.h" #include "talk/base/thread.h" #include "talk/media/base/fakevideocapturer.h" +#include "talk/media/sctp/sctpdataengine.h" #include "talk/session/media/mediasession.h" static const char kStreamLabel1[] = "local_stream_1"; diff --git a/talk/app/webrtc/videosource_unittest.cc b/talk/app/webrtc/videosource_unittest.cc index dddbdeeb4..69e9b3f0f 100644 --- a/talk/app/webrtc/videosource_unittest.cc +++ b/talk/app/webrtc/videosource_unittest.cc @@ -127,14 +127,15 @@ class StateObserver : public ObserverInterface { class VideoSourceTest : public testing::Test { protected: VideoSourceTest() - : channel_manager_(new cricket::ChannelManager( + : capturer_cleanup_(new TestVideoCapturer()), + capturer_(capturer_cleanup_.get()), + channel_manager_(new cricket::ChannelManager( new cricket::FakeMediaEngine(), new cricket::FakeDeviceManager(), talk_base::Thread::Current())) { } void SetUp() { ASSERT_TRUE(channel_manager_->Init()); - capturer_ = new TestVideoCapturer(); } void CreateVideoSource() { @@ -145,7 +146,7 @@ class VideoSourceTest : public testing::Test { const webrtc::MediaConstraintsInterface* constraints) { // VideoSource take ownership of |capturer_| source_ = VideoSource::Create(channel_manager_.get(), - capturer_, + capturer_cleanup_.release(), constraints); ASSERT_TRUE(source_.get() != NULL); @@ -156,7 +157,8 @@ class VideoSourceTest : public testing::Test { source_->AddSink(&renderer_); } - TestVideoCapturer* capturer_; // Raw pointer. Owned by source_. + talk_base::scoped_ptr capturer_cleanup_; + TestVideoCapturer* capturer_; cricket::FakeVideoRenderer renderer_; talk_base::scoped_ptr channel_manager_; talk_base::scoped_ptr state_observer_; @@ -184,9 +186,6 @@ TEST_F(VideoSourceTest, StartStop) { // Test start stop with a remote VideoSource - the video source that has a // RemoteVideoCapturer and takes video frames from FrameInput. TEST_F(VideoSourceTest, StartStopRemote) { - // Will use RemoteVideoCapturer. - delete capturer_; - source_ = VideoSource::Create(channel_manager_.get(), new webrtc::RemoteVideoCapturer(), NULL); diff --git a/talk/app/webrtc/webrtcsdp.cc b/talk/app/webrtc/webrtcsdp.cc index af068af6c..60c427d16 100644 --- a/talk/app/webrtc/webrtcsdp.cc +++ b/talk/app/webrtc/webrtcsdp.cc @@ -42,6 +42,7 @@ #include "talk/media/base/codec.h" #include "talk/media/base/constants.h" #include "talk/media/base/cryptoparams.h" +#include "talk/media/sctp/sctpdataengine.h" #include "talk/p2p/base/candidate.h" #include "talk/p2p/base/constants.h" #include "talk/p2p/base/port.h" diff --git a/talk/app/webrtc/webrtcsdp_unittest.cc b/talk/app/webrtc/webrtcsdp_unittest.cc index 0eedb7fb1..b1505aae7 100644 --- a/talk/app/webrtc/webrtcsdp_unittest.cc +++ b/talk/app/webrtc/webrtcsdp_unittest.cc @@ -281,7 +281,7 @@ static const char kSdpSctpDataChannelString[] = "a=ice-ufrag:ufrag_data\r\n" "a=ice-pwd:pwd_data\r\n" "a=mid:data_content_name\r\n" - "a=fmtp:5000 protocol=webrtc-datachannel; streams=10\r\n"; + "a=fmtp:5000 protocol=webrtc-datachannel; streams=65536\r\n"; static const char kSdpSctpDataChannelWithCandidatesString[] = "m=application 2345 DTLS/SCTP 5000\r\n" @@ -296,7 +296,7 @@ static const char kSdpSctpDataChannelWithCandidatesString[] = "a=ice-ufrag:ufrag_data\r\n" "a=ice-pwd:pwd_data\r\n" "a=mid:data_content_name\r\n" - "a=fmtp:5000 protocol=webrtc-datachannel; streams=10\r\n"; + "a=fmtp:5000 protocol=webrtc-datachannel; streams=65536\r\n"; // One candidate reference string as per W3c spec. diff --git a/talk/app/webrtc/webrtcsession_unittest.cc b/talk/app/webrtc/webrtcsession_unittest.cc index 6f1443299..1531e0efc 100644 --- a/talk/app/webrtc/webrtcsession_unittest.cc +++ b/talk/app/webrtc/webrtcsession_unittest.cc @@ -2496,6 +2496,29 @@ TEST_F(WebRtcSessionTest, TestCreateOfferWithSctpEnabledWithoutStreams) { talk_base::scoped_ptr offer(CreateOffer(NULL)); EXPECT_TRUE(offer->description()->GetContentByName("data") == NULL); + EXPECT_TRUE(offer->description()->GetTransportInfoByName("data") == NULL); +} + +TEST_F(WebRtcSessionTest, TestCreateAnswerWithSctpInOfferAndNoStreams) { + MAYBE_SKIP_TEST(talk_base::SSLStreamAdapter::HaveDtlsSrtp); + SetFactoryDtlsSrtp(); + constraints_.reset(new FakeConstraints()); + constraints_->AddOptional( + webrtc::MediaConstraintsInterface::kEnableSctpDataChannels, true); + InitWithDtls(false); + + // Create remote offer with SCTP. + cricket::MediaSessionOptions options; + options.data_channel_type = cricket::DCT_SCTP; + JsepSessionDescription* offer = + CreateRemoteOffer(options, cricket::SEC_ENABLED); + SetRemoteDescriptionWithoutError(offer); + + // Verifies the answer contains SCTP. + talk_base::scoped_ptr answer(CreateAnswer(NULL)); + EXPECT_TRUE(answer != NULL); + EXPECT_TRUE(answer->description()->GetContentByName("data") != NULL); + EXPECT_TRUE(answer->description()->GetTransportInfoByName("data") != NULL); } TEST_F(WebRtcSessionTest, TestSctpDataChannelWithoutDtls) { diff --git a/talk/app/webrtc/webrtcsessiondescriptionfactory.cc b/talk/app/webrtc/webrtcsessiondescriptionfactory.cc index d0a10147d..30c49a718 100644 --- a/talk/app/webrtc/webrtcsessiondescriptionfactory.cc +++ b/talk/app/webrtc/webrtcsessiondescriptionfactory.cc @@ -243,8 +243,10 @@ void WebRtcSessionDescriptionFactory::CreateAnswer( PostCreateSessionDescriptionFailed(observer, error); return; } - if (data_channel_type_ == cricket::DCT_SCTP && - mediastream_signaling_->HasDataChannels()) { + // RTP data channel is handled in MediaSessionOptions::AddStream. SCTP streams + // are not signaled in the SDP so does not go through that path and must be + // handled here. + if (data_channel_type_ == cricket::DCT_SCTP) { options.data_channel_type = cricket::DCT_SCTP; } diff --git a/talk/base/asyncpacketsocket.h b/talk/base/asyncpacketsocket.h index a88f770ca..3b4748f51 100644 --- a/talk/base/asyncpacketsocket.h +++ b/talk/base/asyncpacketsocket.h @@ -28,6 +28,7 @@ #ifndef TALK_BASE_ASYNCPACKETSOCKET_H_ #define TALK_BASE_ASYNCPACKETSOCKET_H_ +#include "talk/base/dscp.h" #include "talk/base/sigslot.h" #include "talk/base/socket.h" @@ -56,8 +57,9 @@ class AsyncPacketSocket : public sigslot::has_slots<> { virtual SocketAddress GetRemoteAddress() const = 0; // Send a packet. - virtual int Send(const void *pv, size_t cb) = 0; - virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) = 0; + virtual int Send(const void *pv, size_t cb, DiffServCodePoint dscp) = 0; + virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr, + DiffServCodePoint) = 0; // Close the socket. virtual int Close() = 0; diff --git a/talk/base/asynctcpsocket.cc b/talk/base/asynctcpsocket.cc index 095413d3e..517e799c4 100644 --- a/talk/base/asynctcpsocket.cc +++ b/talk/base/asynctcpsocket.cc @@ -141,10 +141,12 @@ void AsyncTCPSocketBase::SetError(int error) { return socket_->SetError(error); } +// TODO(mallinath) - Add support of setting DSCP code on AsyncSocket. int AsyncTCPSocketBase::SendTo(const void *pv, size_t cb, - const SocketAddress& addr) { + const SocketAddress& addr, + DiffServCodePoint dscp) { if (addr == GetRemoteAddress()) - return Send(pv, cb); + return Send(pv, cb, dscp); ASSERT(false); socket_->SetError(ENOTCONN); @@ -261,7 +263,8 @@ AsyncTCPSocket::AsyncTCPSocket(AsyncSocket* socket, bool listen) : AsyncTCPSocketBase(socket, listen, kBufSize) { } -int AsyncTCPSocket::Send(const void *pv, size_t cb) { +// TODO(mallinath) - Add support of setting DSCP code on AsyncSocket. +int AsyncTCPSocket::Send(const void *pv, size_t cb, DiffServCodePoint dscp) { if (cb > kBufSize) { SetError(EMSGSIZE); return -1; diff --git a/talk/base/asynctcpsocket.h b/talk/base/asynctcpsocket.h index b34ce188e..a0e7a7e2f 100644 --- a/talk/base/asynctcpsocket.h +++ b/talk/base/asynctcpsocket.h @@ -43,14 +43,15 @@ class AsyncTCPSocketBase : public AsyncPacketSocket { virtual ~AsyncTCPSocketBase(); // Pure virtual methods to send and recv data. - virtual int Send(const void *pv, size_t cb) = 0; + virtual int Send(const void *pv, size_t cb, DiffServCodePoint dscp) = 0; virtual void ProcessInput(char* data, size_t* len) = 0; // Signals incoming connection. virtual void HandleIncomingConnection(AsyncSocket* socket) = 0; virtual SocketAddress GetLocalAddress() const; virtual SocketAddress GetRemoteAddress() const; - virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr); + virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr, + DiffServCodePoint dscp); virtual int Close(); virtual State GetState() const; @@ -101,7 +102,7 @@ class AsyncTCPSocket : public AsyncTCPSocketBase { AsyncTCPSocket(AsyncSocket* socket, bool listen); virtual ~AsyncTCPSocket() {} - virtual int Send(const void* pv, size_t cb); + virtual int Send(const void* pv, size_t cb, DiffServCodePoint dscp); virtual void ProcessInput(char* data, size_t* len); virtual void HandleIncomingConnection(AsyncSocket* socket); diff --git a/talk/base/asyncudpsocket.cc b/talk/base/asyncudpsocket.cc index 6388ce7ce..97e5dff98 100644 --- a/talk/base/asyncudpsocket.cc +++ b/talk/base/asyncudpsocket.cc @@ -75,12 +75,14 @@ SocketAddress AsyncUDPSocket::GetRemoteAddress() const { return socket_->GetRemoteAddress(); } -int AsyncUDPSocket::Send(const void *pv, size_t cb) { +// TODO(mallinath) - Add support of setting DSCP code on AsyncSocket. +int AsyncUDPSocket::Send(const void *pv, size_t cb, DiffServCodePoint dscp) { return socket_->Send(pv, cb); } -int AsyncUDPSocket::SendTo( - const void *pv, size_t cb, const SocketAddress& addr) { +// TODO(mallinath) - Add support of setting DSCP code on AsyncSocket. +int AsyncUDPSocket::SendTo(const void *pv, size_t cb, + const SocketAddress& addr, DiffServCodePoint dscp) { return socket_->SendTo(pv, cb, addr); } diff --git a/talk/base/asyncudpsocket.h b/talk/base/asyncudpsocket.h index 1bf2ad229..17e12a26c 100644 --- a/talk/base/asyncudpsocket.h +++ b/talk/base/asyncudpsocket.h @@ -52,8 +52,9 @@ class AsyncUDPSocket : public AsyncPacketSocket { virtual SocketAddress GetLocalAddress() const; virtual SocketAddress GetRemoteAddress() const; - virtual int Send(const void *pv, size_t cb); - virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr); + virtual int Send(const void *pv, size_t cb, DiffServCodePoint dscp); + virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr, + DiffServCodePoint dscp); virtual int Close(); virtual State GetState() const; diff --git a/talk/base/dscp.h b/talk/base/dscp.h new file mode 100644 index 000000000..c71ee3e93 --- /dev/null +++ b/talk/base/dscp.h @@ -0,0 +1,62 @@ +/* + * libjingle + * Copyright 2013, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef TALK_BASE_DSCP_H_ +#define TALK_BASE_DSCP_H_ + +namespace talk_base { +// Differentiated Services Code Point. +// See http://tools.ietf.org/html/rfc2474 for details. +enum DiffServCodePoint { + DSCP_NO_CHANGE = -1, + DSCP_DEFAULT = 0, // Same as DSCP_CS0 + DSCP_CS0 = 0, // The default + DSCP_CS1 = 8, // Bulk/background traffic + DSCP_AF11 = 10, + DSCP_AF12 = 12, + DSCP_AF13 = 14, + DSCP_CS2 = 16, + DSCP_AF21 = 18, + DSCP_AF22 = 20, + DSCP_AF23 = 22, + DSCP_CS3 = 24, + DSCP_AF31 = 26, + DSCP_AF32 = 28, + DSCP_AF33 = 30, + DSCP_CS4 = 32, + DSCP_AF41 = 34, // Video + DSCP_AF42 = 36, // Video + DSCP_AF43 = 38, // Video + DSCP_CS5 = 40, // Video + DSCP_EF = 46, // Voice + DSCP_CS6 = 48, // Voice + DSCP_CS7 = 56, // Control messages +}; + +} // namespace talk_base + + #endif // TALK_BASE_DSCP_H_ diff --git a/talk/base/natserver.cc b/talk/base/natserver.cc index 7a3a04509..483542591 100644 --- a/talk/base/natserver.cc +++ b/talk/base/natserver.cc @@ -126,7 +126,8 @@ void NATServer::OnInternalPacket( iter->second->whitelist->insert(dest_addr); // Send the packet to its intended destination. - iter->second->socket->SendTo(buf + length, size - length, dest_addr); + iter->second->socket->SendTo(buf + length, size - length, dest_addr, + DSCP_NO_CHANGE); } void NATServer::OnExternalPacket( @@ -155,7 +156,7 @@ void NATServer::OnExternalPacket( // Copy the data part after the address. std::memcpy(real_buf.get() + addrlength, buf, size); server_socket_->SendTo(real_buf.get(), size + addrlength, - iter->second->route.source()); + iter->second->route.source(), DSCP_NO_CHANGE); } void NATServer::Translate(const SocketAddressPair& route) { diff --git a/talk/base/socket.h b/talk/base/socket.h index 9932cdada..e738060f8 100644 --- a/talk/base/socket.h +++ b/talk/base/socket.h @@ -184,7 +184,8 @@ class Socket { OPT_RCVBUF, // receive buffer size OPT_SNDBUF, // send buffer size OPT_NODELAY, // whether Nagle algorithm is enabled - OPT_IPV6_V6ONLY // Whether the socket is IPv6 only. + OPT_IPV6_V6ONLY, // Whether the socket is IPv6 only. + OPT_DSCP // DSCP code }; virtual int GetOption(Option opt, int* value) = 0; virtual int SetOption(Option opt, int value) = 0; diff --git a/talk/base/testclient.cc b/talk/base/testclient.cc index 0e7625f73..0ef851831 100644 --- a/talk/base/testclient.cc +++ b/talk/base/testclient.cc @@ -25,6 +25,7 @@ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "talk/base/dscp.h" #include "talk/base/testclient.h" #include "talk/base/thread.h" #include "talk/base/timeutils.h" @@ -58,12 +59,12 @@ bool TestClient::CheckConnState(AsyncPacketSocket::State state) { } int TestClient::Send(const char* buf, size_t size) { - return socket_->Send(buf, size); + return socket_->Send(buf, size, DSCP_NO_CHANGE); } int TestClient::SendTo(const char* buf, size_t size, const SocketAddress& dest) { - return socket_->SendTo(buf, size, dest); + return socket_->SendTo(buf, size, dest, DSCP_NO_CHANGE); } TestClient::Packet* TestClient::NextPacket() { diff --git a/talk/base/testechoserver.h b/talk/base/testechoserver.h index 9bb5178c0..10466fa43 100644 --- a/talk/base/testechoserver.h +++ b/talk/base/testechoserver.h @@ -68,7 +68,7 @@ class TestEchoServer : public sigslot::has_slots<> { } void OnPacket(AsyncPacketSocket* socket, const char* buf, size_t size, const SocketAddress& remote_addr) { - socket->Send(buf, size); + socket->Send(buf, size, DSCP_NO_CHANGE); } void OnClose(AsyncPacketSocket* socket, int err) { ClientList::iterator it = diff --git a/talk/base/virtualsocket_unittest.cc b/talk/base/virtualsocket_unittest.cc index 617a57f29..f3b13fc0f 100644 --- a/talk/base/virtualsocket_unittest.cc +++ b/talk/base/virtualsocket_unittest.cc @@ -69,7 +69,7 @@ struct Sender : public MessageHandler { count += size; memcpy(dummy, &cur_time, sizeof(cur_time)); - socket->Send(dummy, size); + socket->Send(dummy, size, DSCP_NO_CHANGE); last_send = cur_time; thread->PostDelayed(NextDelay(), this, 1); diff --git a/talk/media/base/fakenetworkinterface.h b/talk/media/base/fakenetworkinterface.h index 2fdd1d4a3..d0f277e8f 100644 --- a/talk/media/base/fakenetworkinterface.h +++ b/talk/media/base/fakenetworkinterface.h @@ -130,7 +130,8 @@ class FakeNetworkInterface : public MediaChannel::NetworkInterface, int recvbuf_size() const { return recvbuf_size_; } protected: - virtual bool SendPacket(talk_base::Buffer* packet) { + virtual bool SendPacket(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp) { talk_base::CritScope cs(&crit_); uint32 cur_ssrc = 0; @@ -164,7 +165,8 @@ class FakeNetworkInterface : public MediaChannel::NetworkInterface, return true; } - virtual bool SendRtcp(talk_base::Buffer* packet) { + virtual bool SendRtcp(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp) { talk_base::CritScope cs(&crit_); rtcp_packets_.push_back(*packet); if (!conf_) { diff --git a/talk/media/base/filemediaengine_unittest.cc b/talk/media/base/filemediaengine_unittest.cc index 703fc118d..e4d72bbe6 100644 --- a/talk/media/base/filemediaengine_unittest.cc +++ b/talk/media/base/filemediaengine_unittest.cc @@ -58,7 +58,8 @@ class FileNetworkInterface : public MediaChannel::NetworkInterface { } // Implement pure virtual methods of NetworkInterface. - virtual bool SendPacket(talk_base::Buffer* packet) { + virtual bool SendPacket(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp) { if (!packet) return false; if (media_channel_) { @@ -74,11 +75,13 @@ class FileNetworkInterface : public MediaChannel::NetworkInterface { return true; } - virtual bool SendRtcp(talk_base::Buffer* packet) { return false; } + virtual bool SendRtcp(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp) { return false; } virtual int SetOption(MediaChannel::NetworkInterface::SocketType type, talk_base::Socket::Option opt, int option) { return 0; } + virtual void SetDefaultDSCPCode(talk_base::DiffServCodePoint dscp) {} size_t num_sent_packets() const { return num_sent_packets_; } diff --git a/talk/media/base/mediachannel.h b/talk/media/base/mediachannel.h index c73d20ae8..7431bc100 100644 --- a/talk/media/base/mediachannel.h +++ b/talk/media/base/mediachannel.h @@ -33,6 +33,7 @@ #include "talk/base/basictypes.h" #include "talk/base/buffer.h" +#include "talk/base/dscp.h" #include "talk/base/logging.h" #include "talk/base/sigslot.h" #include "talk/base/socket.h" @@ -414,8 +415,12 @@ class MediaChannel : public sigslot::has_slots<> { class NetworkInterface { public: enum SocketType { ST_RTP, ST_RTCP }; - virtual bool SendPacket(talk_base::Buffer* packet) = 0; - virtual bool SendRtcp(talk_base::Buffer* packet) = 0; + virtual bool SendPacket( + talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp = talk_base::DSCP_NO_CHANGE) = 0; + virtual bool SendRtcp( + talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp = talk_base::DSCP_NO_CHANGE) = 0; virtual int SetOption(SocketType type, talk_base::Socket::Option opt, int option) = 0; virtual ~NetworkInterface() {} @@ -862,10 +867,12 @@ class VideoMediaChannel : public MediaChannel { }; enum DataMessageType { - // TODO(pthatcher): Make this enum match the SCTP PPIDs that WebRTC uses? - DMT_CONTROL = 0, - DMT_BINARY = 1, - DMT_TEXT = 2, + // Chrome-Internal use only. See SctpDataMediaChannel for the actual PPID + // values. + DMT_NONE = 0, + DMT_CONTROL = 1, + DMT_BINARY = 2, + DMT_TEXT = 3, }; // Info about data received in DataMediaChannel. For use in diff --git a/talk/media/sctp/sctpdataengine.cc b/talk/media/sctp/sctpdataengine.cc index 0d6bc59b8..3d450c686 100644 --- a/talk/media/sctp/sctpdataengine.cc +++ b/talk/media/sctp/sctpdataengine.cc @@ -64,28 +64,58 @@ struct SctpInboundPacket { int flags; }; -// Helper for logging SCTP data. Given a buffer, returns a readable string. +// Helper for logging SCTP messages. static void debug_sctp_printf(const char *format, ...) { char s[255]; va_list ap; va_start(ap, format); vsnprintf(s, sizeof(s), format, ap); - LOG(LS_INFO) << s; - // vprintf(format, ap); + LOG(LS_INFO) << "SCTP: " << s; va_end(ap); } -// Helper for make a string dump of some SCTP data. Used for LOG -// debugging messages. -static std::string SctpDataToDebugString(void* buffer, size_t length, - int dump_type) { - char *dump_buf = usrsctp_dumppacket(buffer, length, dump_type); - if (!dump_buf) { - return ""; +// Get the PPID to use for the terminating fragment of this type. +static SctpDataMediaChannel::PayloadProtocolIdentifier GetPpid( + cricket::DataMessageType type) { + switch (type) { + default: + case cricket::DMT_NONE: + return SctpDataMediaChannel::PPID_NONE; + case cricket::DMT_CONTROL: + return SctpDataMediaChannel::PPID_CONTROL; + case cricket::DMT_BINARY: + return SctpDataMediaChannel::PPID_BINARY_LAST; + case cricket::DMT_TEXT: + return SctpDataMediaChannel::PPID_TEXT_LAST; + }; +} + +static bool GetDataMediaType( + SctpDataMediaChannel::PayloadProtocolIdentifier ppid, + cricket::DataMessageType *dest) { + ASSERT(dest != NULL); + switch (ppid) { + case SctpDataMediaChannel::PPID_BINARY_PARTIAL: + case SctpDataMediaChannel::PPID_BINARY_LAST: + *dest = cricket::DMT_BINARY; + return true; + + case SctpDataMediaChannel::PPID_TEXT_PARTIAL: + case SctpDataMediaChannel::PPID_TEXT_LAST: + *dest = cricket::DMT_TEXT; + return true; + + case SctpDataMediaChannel::PPID_CONTROL: + *dest = cricket::DMT_CONTROL; + return true; + + case SctpDataMediaChannel::PPID_NONE: + *dest = cricket::DMT_NONE; + return true; + + default: + return false; } - std::string s = std::string(dump_buf); - usrsctp_freedumpbuffer(dump_buf); - return s; } // This is the callback usrsctp uses when there's data to send on the network @@ -96,9 +126,7 @@ static int OnSctpOutboundPacket(void* addr, void* data, size_t length, LOG(LS_VERBOSE) << "global OnSctpOutboundPacket():" << "addr: " << addr << "; length: " << length << "; tos: " << std::hex << static_cast(tos) - << "; set_df: " << std::hex << static_cast(set_df) - << "; data:" << SctpDataToDebugString(data, length, - SCTP_DUMP_OUTBOUND); + << "; set_df: " << std::hex << static_cast(set_df); // Note: We have to copy the data; the caller will delete it. talk_base::Buffer* buffer = new talk_base::Buffer(data, length); channel->worker_thread()->Post(channel, MSG_SCTPOUTBOUNDPACKET, @@ -114,37 +142,29 @@ static int OnSctpInboundPacket(struct socket* sock, union sctp_sockstore addr, void* data, size_t length, struct sctp_rcvinfo rcv, int flags, void* ulp_info) { - LOG(LS_VERBOSE) << "global OnSctpInboundPacket... Msg of length " - << length << " received via " << addr.sconn.sconn_addr << ":" - << talk_base::NetworkToHost16(addr.sconn.sconn_port) - << " on stream " << rcv.rcv_sid - << " with SSN " << rcv.rcv_ssn - << " and TSN " << rcv.rcv_tsn << ", PPID " - << talk_base::NetworkToHost32(rcv.rcv_ppid) - << ", context " << rcv.rcv_context - << ", data: " << data - << ", ulp_info:" << ulp_info - << ", flags:" << std::hex << flags; SctpDataMediaChannel* channel = static_cast(ulp_info); - // The second log call is useful when the defines flags are incorrect. In - // this case, ulp_info ends up being bad and the second log message will - // cause a crash. - LOG(LS_VERBOSE) << "global OnSctpInboundPacket. channel=" - << channel->debug_name() << "..."; // Post data to the channel's receiver thread (copying it). // TODO(ldixon): Unclear if copy is needed as this method is responsible for // memory cleanup. But this does simplify code. - const uint32 native_ppid = talk_base::HostToNetwork32(rcv.rcv_ppid); - SctpInboundPacket* packet = new SctpInboundPacket(); - packet->buffer.SetData(data, length); - packet->params.ssrc = rcv.rcv_sid; - packet->params.seq_num = rcv.rcv_ssn; - packet->params.timestamp = rcv.rcv_tsn; - packet->params.type = - static_cast(native_ppid); - packet->flags = flags; - channel->worker_thread()->Post(channel, MSG_SCTPINBOUNDPACKET, - talk_base::WrapMessageData(packet)); + const SctpDataMediaChannel::PayloadProtocolIdentifier ppid = + static_cast( + talk_base::HostToNetwork32(rcv.rcv_ppid)); + cricket::DataMessageType type = cricket::DMT_NONE; + if (!GetDataMediaType(ppid, &type) && !(flags & MSG_NOTIFICATION)) { + // It's neither a notification nor a recognized data packet. Drop it. + LOG(LS_ERROR) << "Received an unknown PPID " << ppid + << " on an SCTP packet. Dropping."; + } else { + SctpInboundPacket* packet = new SctpInboundPacket; + packet->buffer.SetData(data, length); + packet->params.ssrc = rcv.rcv_sid; + packet->params.seq_num = rcv.rcv_ssn; + packet->params.timestamp = rcv.rcv_tsn; + packet->params.type = type; + packet->flags = flags; + channel->worker_thread()->Post(channel, MSG_SCTPINBOUNDPACKET, + talk_base::WrapMessageData(packet)); + } free(data); return 1; } @@ -181,6 +201,14 @@ SctpDataEngine::SctpDataEngine() { // See: http://lakerest.net/pipermail/sctp-coders/2012-January/009438.html // See: http://svnweb.freebsd.org/base?view=revision&revision=229805 // usrsctp_sysctl_set_sctp_blackhole(2); + + // Set the number of default outgoing streams. This is the number we'll + // send in the SCTP INIT message. The 'appropriate default' in the + // second paragraph of + // http://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-05#section-6.2 + // is cricket::kMaxSctpSid. + usrsctp_sysctl_set_sctp_nr_outgoing_streams_default( + cricket::kMaxSctpSid); } usrsctp_engines_count++; @@ -437,7 +465,8 @@ bool SctpDataMediaChannel::SendData( const talk_base::Buffer& payload, SendDataResult* result) { if (result) { - // If we return true, we'll set this to SDR_SUCCESS. + // Preset |result| to assume an error. If SendData succeeds, we'll + // overwrite |*result| once more at the end. *result = SDR_ERROR; } @@ -457,41 +486,36 @@ bool SctpDataMediaChannel::SendData( return false; } - // TODO(ldixon): Experiment with sctp_sendv_spa instead of sctp_sndinfo. e.g. - // struct sctp_sendv_spa spa = {0}; - // spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID; - // spa.sendv_sndinfo.snd_sid = params.ssrc; - // spa.sendv_sndinfo.snd_context = 0; - // spa.sendv_sndinfo.snd_assoc_id = 0; - // TODO(pthatcher): Support different types of protocols (e.g. SSL) and - // messages (e.g. Binary) via SendDataParams. - // spa.sendv_sndinfo.snd_ppid = htonl(PPID_NONE); - // TODO(pthatcher): Support different reliability semantics. - // For reliable: Remove SCTP_UNORDERED. - // For partially-reliable: Add rtx or ttl. - // spa.sendv_sndinfo.snd_flags = SCTP_UNORDERED; - // TODO(phatcher): Try some of these things. - // spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; - // spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX; - // spa.sendv_prinfo.pr_value = htons(max_retransmit_count); - // spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL; - // spa.sendv_prinfo.pr_value = htons(max_retransmit_time); // // Send data using SCTP. - sctp_sndinfo sndinfo = {0}; - sndinfo.snd_sid = params.ssrc; - sndinfo.snd_flags = 0; - // TODO(pthatcher): Once data types are added to SendParams, this can be set - // from SendParams. - sndinfo.snd_ppid = talk_base::HostToNetwork32(params.type); - sndinfo.snd_context = 0; - sndinfo.snd_assoc_id = 0; - ssize_t res = usrsctp_sendv(sock_, payload.data(), - static_cast(payload.length()), - NULL, 0, &sndinfo, - static_cast(sizeof(sndinfo)), - SCTP_SENDV_SNDINFO, 0); - if (res < 0) { + ssize_t send_res = 0; // result from usrsctp_sendv. + struct sctp_sendv_spa spa = {0}; + spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID; + spa.sendv_sndinfo.snd_sid = params.ssrc; + spa.sendv_sndinfo.snd_ppid = talk_base::HostToNetwork32( + GetPpid(params.type)); + + // Ordered implies reliable. + if (!params.ordered) { + spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED; + if (params.max_rtx_count >= 0 || params.max_rtx_ms == 0) { + spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; + spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX; + spa.sendv_prinfo.pr_value = params.max_rtx_count; + } else { + spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; + spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL; + spa.sendv_prinfo.pr_value = params.max_rtx_ms; + } + } + + // We don't fragment. + send_res = usrsctp_sendv(sock_, payload.data(), + static_cast(payload.length()), + NULL, 0, &spa, + static_cast(sizeof(spa)), + SCTP_SENDV_SPA, 0); + if (send_res < 0) { if (errno == EWOULDBLOCK) { *result = SDR_BLOCK; LOG(LS_INFO) << debug_name_ << "->SendData(...): EWOULDBLOCK returned"; @@ -503,7 +527,7 @@ bool SctpDataMediaChannel::SendData( return false; } if (result) { - // If we return true, we'll set this to SDR_SUCCESS. + // Only way out now is success. *result = SDR_SUCCESS; } return true; @@ -511,17 +535,13 @@ bool SctpDataMediaChannel::SendData( // Called by network interface when a packet has been received. void SctpDataMediaChannel::OnPacketReceived(talk_base::Buffer* packet) { - LOG(LS_VERBOSE) << debug_name_ << "->OnPacketReceived(...): " - << " length=" << packet->length() << "; data=" - << SctpDataToDebugString(packet->data(), packet->length(), - SCTP_DUMP_INBOUND); + LOG(LS_VERBOSE) << debug_name_ << "->OnPacketReceived(...): " << " length=" + << packet->length() << ", sending: " << sending_; // Only give receiving packets to usrsctp after if connected. This enables two // peers to each make a connect call, but for them not to receive an INIT // packet before they have called connect; least the last receiver of the INIT // packet will have called connect, and a connection will be established. if (sending_) { - LOG(LS_VERBOSE) << debug_name_ << "->OnPacketReceived(...):" - << " Passed packet to sctp."; // Pass received packet to SCTP stack. Once processed by usrsctp, the data // will be will be given to the global OnSctpInboundData, and then, // marshalled by a Post and handled with OnMessage. @@ -529,8 +549,6 @@ void SctpDataMediaChannel::OnPacketReceived(talk_base::Buffer* packet) { } else { // TODO(ldixon): Consider caching the packet for very slightly better // reliability. - LOG(LS_INFO) << debug_name_ << "->OnPacketReceived(...):" - << " Threw packet (probably an INIT) away."; } } @@ -539,10 +557,8 @@ void SctpDataMediaChannel::OnInboundPacketFromSctpToChannel( LOG(LS_VERBOSE) << debug_name_ << "->OnInboundPacketFromSctpToChannel(...): " << "Received SCTP data:" << " ssrc=" << packet->params.ssrc - << " data='" << std::string(packet->buffer.data(), - packet->buffer.length()) << " notification: " << (packet->flags & MSG_NOTIFICATION) - << "' length=" << packet->buffer.length(); + << " length=" << packet->buffer.length(); // Sending a packet with data == NULL (no data) is SCTPs "close the // connection" message. This sets sock_ = NULL; if (!packet->buffer.length() || !packet->buffer.data()) { diff --git a/talk/media/sctp/sctpdataengine.h b/talk/media/sctp/sctpdataengine.h index d62eff1ae..429016e0e 100644 --- a/talk/media/sctp/sctpdataengine.h +++ b/talk/media/sctp/sctpdataengine.h @@ -54,6 +54,10 @@ struct sctp_assoc_change; struct socket; namespace cricket { +// The highest stream ID (Sid) that SCTP allows, and the number of streams we +// tell SCTP we're going to use. +const uint32 kMaxSctpSid = USHRT_MAX; + // A DataEngine that interacts with usrsctp. // // From channel calls, data flows like this: @@ -108,12 +112,14 @@ class SctpDataMediaChannel : public DataMediaChannel, // on top of SCTP. enum PayloadProtocolIdentifier { PPID_NONE = 0, // No protocol is specified. - // Specified by Mozilla. Not clear that this is actually part of the - // standard. Use with caution! - // http://mxr.mozilla.org/mozilla-central/source/netwerk/sctp/datachannel/DataChannelProtocol.h#22 + // Matches the PPIDs in mozilla source and + // https://datatracker.ietf.org/doc/draft-ietf-rtcweb-data-protocol Sec. 9 + // They're not yet assigned by IANA. PPID_CONTROL = 50, - PPID_TEXT = 51, - PPID_BINARY = 52, + PPID_BINARY_PARTIAL = 52, + PPID_BINARY_LAST = 53, + PPID_TEXT_PARTIAL = 54, + PPID_TEXT_LAST = 51 }; // Given a thread which will be used to post messages (received data) to this @@ -208,11 +214,7 @@ class SctpDataMediaChannel : public DataMediaChannel, // related to the ports at the IP level. int local_port_; int remote_port_; - // TODO(ldixon): investigate why removing 'struct' makes the compiler - // complain. - // - // The socket created by usrsctp_socket(...). - struct socket* sock_; + struct socket* sock_; // The socket created by usrsctp_socket(...). // sending_ is true iff there is a connected socket. bool sending_; diff --git a/talk/media/sctp/sctpdataengine_unittest.cc b/talk/media/sctp/sctpdataengine_unittest.cc index 2b8787f3a..363e7dfeb 100644 --- a/talk/media/sctp/sctpdataengine_unittest.cc +++ b/talk/media/sctp/sctpdataengine_unittest.cc @@ -60,7 +60,8 @@ class SctpFakeNetworkInterface : public cricket::MediaChannel::NetworkInterface, protected: // Called to send raw packet down the wire (e.g. SCTP an packet). - virtual bool SendPacket(talk_base::Buffer* packet) { + virtual bool SendPacket(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp) { LOG(LS_VERBOSE) << "SctpFakeNetworkInterface::SendPacket"; // TODO(ldixon): Can/should we use Buffer.TransferTo here? @@ -89,7 +90,8 @@ class SctpFakeNetworkInterface : public cricket::MediaChannel::NetworkInterface, // Unsupported functions required to exist by NetworkInterface. // TODO(ldixon): Refactor parent NetworkInterface class so these are not // required. They are RTC specific and should be in an appropriate subclass. - virtual bool SendRtcp(talk_base::Buffer* packet) { + virtual bool SendRtcp(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp) { LOG(LS_WARNING) << "Unsupported: SctpFakeNetworkInterface::SendRtcp."; return false; } @@ -98,6 +100,9 @@ class SctpFakeNetworkInterface : public cricket::MediaChannel::NetworkInterface, LOG(LS_WARNING) << "Unsupported: SctpFakeNetworkInterface::SetOption."; return 0; } + virtual void SetDefaultDSCPCode(talk_base::DiffServCodePoint dscp) { + LOG(LS_WARNING) << "Unsupported: SctpFakeNetworkInterface::SetOption."; + } private: // Not owned by this class. diff --git a/talk/media/webrtc/webrtcvideoengine.cc b/talk/media/webrtc/webrtcvideoengine.cc index a50e55ce8..fd7e5bfa3 100644 --- a/talk/media/webrtc/webrtcvideoengine.cc +++ b/talk/media/webrtc/webrtcvideoengine.cc @@ -1223,7 +1223,7 @@ static void AddDefaultFeedbackParams(VideoCodec* codec) { } // Rebuilds the codec list to be only those that are less intensive -// than the specified codec. +// than the specified codec. Prefers internal codec over external. bool WebRtcVideoEngine::RebuildCodecList(const VideoCodec& in_codec) { if (!FindCodec(in_codec)) return false; @@ -1231,32 +1231,12 @@ bool WebRtcVideoEngine::RebuildCodecList(const VideoCodec& in_codec) { video_codecs_.clear(); bool found = false; - std::set external_codec_names; - if (encoder_factory_) { - const std::vector& codecs = - encoder_factory_->codecs(); - for (size_t i = 0; i < codecs.size(); ++i) { - if (!found) - found = (in_codec.name == codecs[i].name); - VideoCodec codec( - GetExternalVideoPayloadType(static_cast(i)), - codecs[i].name, - codecs[i].max_width, - codecs[i].max_height, - codecs[i].max_fps, - static_cast(codecs.size() + ARRAY_SIZE(kVideoCodecPrefs) - i)); - AddDefaultFeedbackParams(&codec); - video_codecs_.push_back(codec); - external_codec_names.insert(codecs[i].name); - } - } + std::set internal_codec_names; for (size_t i = 0; i < ARRAY_SIZE(kVideoCodecPrefs); ++i) { const VideoCodecPref& pref(kVideoCodecPrefs[i]); if (!found) found = (in_codec.name == pref.name); - bool is_external_codec = external_codec_names.find(pref.name) != - external_codec_names.end(); - if (found && !is_external_codec) { + if (found) { VideoCodec codec(pref.payload_type, pref.name, in_codec.width, in_codec.height, in_codec.framerate, static_cast(ARRAY_SIZE(kVideoCodecPrefs) - i)); @@ -1264,6 +1244,28 @@ bool WebRtcVideoEngine::RebuildCodecList(const VideoCodec& in_codec) { AddDefaultFeedbackParams(&codec); } video_codecs_.push_back(codec); + internal_codec_names.insert(codec.name); + } + } + if (encoder_factory_) { + const std::vector& codecs = + encoder_factory_->codecs(); + for (size_t i = 0; i < codecs.size(); ++i) { + bool is_internal_codec = internal_codec_names.find(codecs[i].name) != + internal_codec_names.end(); + if (!is_internal_codec) { + if (!found) + found = (in_codec.name == codecs[i].name); + VideoCodec codec( + GetExternalVideoPayloadType(static_cast(i)), + codecs[i].name, + codecs[i].max_width, + codecs[i].max_height, + codecs[i].max_fps, + static_cast(codecs.size() + ARRAY_SIZE(kVideoCodecPrefs) - i)); + AddDefaultFeedbackParams(&codec); + video_codecs_.push_back(codec); + } } } ASSERT(found); diff --git a/talk/media/webrtc/webrtcvideoengine_unittest.cc b/talk/media/webrtc/webrtcvideoengine_unittest.cc index 86f93142c..04662caf7 100644 --- a/talk/media/webrtc/webrtcvideoengine_unittest.cc +++ b/talk/media/webrtc/webrtcvideoengine_unittest.cc @@ -1758,18 +1758,57 @@ TEST_F(WebRtcVideoEngineTestFake, FeedbackParamsForNonVP8) { EXPECT_TRUE(SetupEngine()); std::vector codecs(engine_.codecs()); - EXPECT_EQ("GENERIC", codecs[0].name); - EXPECT_TRUE(codecs[0].HasFeedbackParam( + // The external codec will appear at last. + size_t pos = codecs.size() - 1; + EXPECT_EQ("GENERIC", codecs[pos].name); + EXPECT_TRUE(codecs[pos].HasFeedbackParam( cricket::FeedbackParam(cricket::kRtcpFbParamNack, cricket::kParamValueEmpty))); - EXPECT_TRUE(codecs[0].HasFeedbackParam( + EXPECT_TRUE(codecs[pos].HasFeedbackParam( cricket::FeedbackParam(cricket::kRtcpFbParamRemb, cricket::kParamValueEmpty))); - EXPECT_TRUE(codecs[0].HasFeedbackParam( + EXPECT_TRUE(codecs[pos].HasFeedbackParam( cricket::FeedbackParam(cricket::kRtcpFbParamCcm, cricket::kRtcpFbCcmParamFir))); } +// Test external codec with be added to the end of the supported codec list. +TEST_F(WebRtcVideoEngineTestFake, ExternalCodecAddedToTheEnd) { + EXPECT_TRUE(SetupEngine()); + + std::vector codecs(engine_.codecs()); + EXPECT_EQ("VP8", codecs[0].name); + + encoder_factory_.AddSupportedVideoCodecType(webrtc::kVideoCodecGeneric, + "GENERIC"); + engine_.SetExternalEncoderFactory(&encoder_factory_); + encoder_factory_.NotifyCodecsAvailable(); + + codecs = engine_.codecs(); + // The external codec will appear at last. + EXPECT_EQ("GENERIC", codecs[codecs.size() - 1].name); +} + +// Test that external codec with be ignored if it has the same name as one of +// the internal codecs. +TEST_F(WebRtcVideoEngineTestFake, ExternalCodecIgnored) { + EXPECT_TRUE(SetupEngine()); + + std::vector internal_codecs(engine_.codecs()); + EXPECT_EQ("VP8", internal_codecs[0].name); + + encoder_factory_.AddSupportedVideoCodecType(webrtc::kVideoCodecVP8, "VP8"); + engine_.SetExternalEncoderFactory(&encoder_factory_); + encoder_factory_.NotifyCodecsAvailable(); + + std::vector codecs = engine_.codecs(); + EXPECT_EQ("VP8", codecs[0].name); + EXPECT_EQ(internal_codecs[0].height, codecs[0].height); + EXPECT_EQ(internal_codecs[0].width, codecs[0].width); + // Verify the last codec is not the external codec. + EXPECT_NE("VP8", codecs[codecs.size() - 1].name); +} + TEST_F(WebRtcVideoEngineTestFake, UpdateEncoderCodecsAfterSetFactory) { engine_.SetExternalEncoderFactory(&encoder_factory_); EXPECT_TRUE(SetupEngine()); diff --git a/talk/p2p/base/asyncstuntcpsocket.cc b/talk/p2p/base/asyncstuntcpsocket.cc index 2f616410f..ec00c0482 100644 --- a/talk/p2p/base/asyncstuntcpsocket.cc +++ b/talk/p2p/base/asyncstuntcpsocket.cc @@ -65,7 +65,9 @@ AsyncStunTCPSocket::AsyncStunTCPSocket( : talk_base::AsyncTCPSocketBase(socket, listen, kBufSize) { } -int AsyncStunTCPSocket::Send(const void *pv, size_t cb) { +// TODO(mallinath) - Add support of setting DSCP code on AsyncSocket. +int AsyncStunTCPSocket::Send(const void *pv, size_t cb, + talk_base::DiffServCodePoint dscp) { if (cb > kBufSize || cb < kPacketLenSize + kPacketLenOffset) { SetError(EMSGSIZE); return -1; diff --git a/talk/p2p/base/asyncstuntcpsocket.h b/talk/p2p/base/asyncstuntcpsocket.h index 2380c4c0d..ff748d1f2 100644 --- a/talk/p2p/base/asyncstuntcpsocket.h +++ b/talk/p2p/base/asyncstuntcpsocket.h @@ -47,7 +47,8 @@ class AsyncStunTCPSocket : public talk_base::AsyncTCPSocketBase { AsyncStunTCPSocket(talk_base::AsyncSocket* socket, bool listen); virtual ~AsyncStunTCPSocket() {} - virtual int Send(const void* pv, size_t cb); + virtual int Send(const void* pv, size_t cb, + talk_base::DiffServCodePoint dscp); virtual void ProcessInput(char* data, size_t* len); virtual void HandleIncomingConnection(talk_base::AsyncSocket* socket); diff --git a/talk/p2p/base/asyncstuntcpsocket_unittest.cc b/talk/p2p/base/asyncstuntcpsocket_unittest.cc index a67571264..7cb380b0a 100644 --- a/talk/p2p/base/asyncstuntcpsocket_unittest.cc +++ b/talk/p2p/base/asyncstuntcpsocket_unittest.cc @@ -121,7 +121,8 @@ class AsyncStunTCPSocketTest : public testing::Test, } bool Send(const void* data, size_t len) { - size_t ret = send_socket_->Send(reinterpret_cast(data), len); + size_t ret = send_socket_->Send( + reinterpret_cast(data), len, talk_base::DSCP_NO_CHANGE); vss_->ProcessMessagesUntilIdle(); return (ret == len); } diff --git a/talk/p2p/base/dtlstransportchannel.cc b/talk/p2p/base/dtlstransportchannel.cc index 40b4e31ea..dead3a550 100644 --- a/talk/p2p/base/dtlstransportchannel.cc +++ b/talk/p2p/base/dtlstransportchannel.cc @@ -29,6 +29,7 @@ #include "talk/p2p/base/dtlstransportchannel.h" #include "talk/base/buffer.h" +#include "talk/base/dscp.h" #include "talk/base/messagequeue.h" #include "talk/base/stream.h" #include "talk/base/sslstreamadapter.h" @@ -69,7 +70,8 @@ talk_base::StreamResult StreamInterfaceChannel::Write(const void* data, int* error) { // Always succeeds, since this is an unreliable transport anyway. // TODO: Should this block if channel_'s temporarily unwritable? - channel_->SendPacket(static_cast(data), data_len); + channel_->SendPacket( + static_cast(data), data_len, talk_base::DSCP_NO_CHANGE); if (written) { *written = data_len; } @@ -297,6 +299,7 @@ bool DtlsTransportChannelWrapper::GetSrtpCipher(std::string* cipher) { // Called from upper layers to send a media packet. int DtlsTransportChannelWrapper::SendPacket(const char* data, size_t size, + talk_base::DiffServCodePoint dscp, int flags) { int result = -1; @@ -321,7 +324,7 @@ int DtlsTransportChannelWrapper::SendPacket(const char* data, size_t size, break; } - result = channel_->SendPacket(data, size); + result = channel_->SendPacket(data, size, dscp); } else { result = (dtls_->WriteAll(data, size, NULL, NULL) == talk_base::SR_SUCCESS) ? static_cast(size) : -1; @@ -329,7 +332,7 @@ int DtlsTransportChannelWrapper::SendPacket(const char* data, size_t size, break; // Not doing DTLS. case STATE_NONE: - result = channel_->SendPacket(data, size); + result = channel_->SendPacket(data, size, dscp); break; case STATE_CLOSED: // Can't send anything when we're closed. diff --git a/talk/p2p/base/dtlstransportchannel.h b/talk/p2p/base/dtlstransportchannel.h index ed0db6877..aec8c7ac4 100644 --- a/talk/p2p/base/dtlstransportchannel.h +++ b/talk/p2p/base/dtlstransportchannel.h @@ -135,7 +135,9 @@ class DtlsTransportChannelWrapper : public TransportChannelImpl { virtual bool IsDtlsActive() const { return dtls_state_ != STATE_NONE; } // Called to send a packet (via DTLS, if turned on). - virtual int SendPacket(const char* data, size_t size, int flags); + virtual int SendPacket(const char* data, size_t size, + talk_base::DiffServCodePoint dscp, + int flags); // TransportChannel calls that we forward to the wrapped transport. virtual int SetOption(talk_base::Socket::Option opt, int value) { diff --git a/talk/p2p/base/dtlstransportchannel_unittest.cc b/talk/p2p/base/dtlstransportchannel_unittest.cc index 750710100..267d60be1 100644 --- a/talk/p2p/base/dtlstransportchannel_unittest.cc +++ b/talk/p2p/base/dtlstransportchannel_unittest.cc @@ -29,6 +29,7 @@ #include #include "talk/base/common.h" +#include "talk/base/dscp.h" #include "talk/base/gunit.h" #include "talk/base/helpers.h" #include "talk/base/scoped_ptr.h" @@ -244,7 +245,8 @@ class DtlsTestClient : public sigslot::has_slots<> { // Only set the bypass flag if we've activated DTLS. int flags = (identity_.get() && srtp) ? cricket::PF_SRTP_BYPASS : 0; - int rv = channels_[channel]->SendPacket(packet.get(), size, flags); + int rv = channels_[channel]->SendPacket( + packet.get(), size, talk_base::DSCP_NO_CHANGE, flags); ASSERT_GT(rv, 0); ASSERT_EQ(size, static_cast(rv)); ++sent; diff --git a/talk/p2p/base/fakesession.h b/talk/p2p/base/fakesession.h index 8b1550c02..d162950a3 100644 --- a/talk/p2p/base/fakesession.h +++ b/talk/p2p/base/fakesession.h @@ -169,7 +169,8 @@ class FakeTransportChannel : public TransportChannelImpl, } } - virtual int SendPacket(const char* data, size_t len, int flags) { + virtual int SendPacket(const char* data, size_t len, + talk_base::DiffServCodePoint dscp, int flags) { if (state_ != STATE_CONNECTED) { return -1; } diff --git a/talk/p2p/base/p2ptransportchannel.cc b/talk/p2p/base/p2ptransportchannel.cc index 7a72d1008..d45a66c40 100644 --- a/talk/p2p/base/p2ptransportchannel.cc +++ b/talk/p2p/base/p2ptransportchannel.cc @@ -518,7 +518,20 @@ void P2PTransportChannel::OnUnknownAddress( // request came from. // There shouldn't be an existing connection with this remote address. - ASSERT(port->GetConnection(new_remote_candidate.address()) == NULL); + // When ports are muxed, this channel might get multiple unknown addres + // signals. In that case if the connection is already exists, we should + // simply ignore the signal othewise send server error. + if (port->GetConnection(new_remote_candidate.address()) && port_muxed) { + LOG(LS_INFO) << "Connection already exist for PeerReflexive candidate: " + << new_remote_candidate.ToString(); + return; + } else if (port->GetConnection(new_remote_candidate.address())) { + ASSERT(false); + port->SendBindingErrorResponse(stun_msg, address, + STUN_ERROR_SERVER_ERROR, + STUN_ERROR_REASON_SERVER_ERROR); + return; + } Connection* connection = port->CreateConnection( new_remote_candidate, cricket::PortInterface::ORIGIN_THIS_PORT); @@ -773,7 +786,9 @@ int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) { } // Send data to the other side, using our best connection. -int P2PTransportChannel::SendPacket(const char *data, size_t len, int flags) { +int P2PTransportChannel::SendPacket(const char *data, size_t len, + talk_base::DiffServCodePoint dscp, + int flags) { ASSERT(worker_thread_ == talk_base::Thread::Current()); if (flags != 0) { error_ = EINVAL; @@ -783,7 +798,7 @@ int P2PTransportChannel::SendPacket(const char *data, size_t len, int flags) { error_ = EWOULDBLOCK; return -1; } - int sent = best_connection_->Send(data, len); + int sent = best_connection_->Send(data, len, dscp); if (sent <= 0) { ASSERT(sent < 0); error_ = best_connection_->GetError(); @@ -823,6 +838,14 @@ bool P2PTransportChannel::GetStats(ConnectionInfos *infos) { return true; } +talk_base::DiffServCodePoint P2PTransportChannel::DefaultDscpValue() const { + OptionMap::const_iterator it = options_.find(talk_base::Socket::OPT_DSCP); + if (it == options_.end()) { + return talk_base::DSCP_NO_CHANGE; + } + return static_cast (it->second); +} + // Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending) void P2PTransportChannel::Allocate() { // Time for a new allocator, lets make sure we have a signalling channel diff --git a/talk/p2p/base/p2ptransportchannel.h b/talk/p2p/base/p2ptransportchannel.h index 17a489fc3..2fc718641 100644 --- a/talk/p2p/base/p2ptransportchannel.h +++ b/talk/p2p/base/p2ptransportchannel.h @@ -90,7 +90,8 @@ class P2PTransportChannel : public TransportChannelImpl, virtual void OnCandidate(const Candidate& candidate); // From TransportChannel: - virtual int SendPacket(const char *data, size_t len, int flags); + virtual int SendPacket(const char *data, size_t len, + talk_base::DiffServCodePoint dscp, int flags); virtual int SetOption(talk_base::Socket::Option opt, int value); virtual int GetError() { return error_; } virtual bool GetStats(std::vector* stats); @@ -149,6 +150,9 @@ class P2PTransportChannel : public TransportChannelImpl, return false; } + // Helper method used only in unittest. + talk_base::DiffServCodePoint DefaultDscpValue() const; + private: talk_base::Thread* thread() { return worker_thread_; } PortAllocatorSession* allocator_session() { diff --git a/talk/p2p/base/p2ptransportchannel_unittest.cc b/talk/p2p/base/p2ptransportchannel_unittest.cc index cf8ee025e..e3cddc0e2 100644 --- a/talk/p2p/base/p2ptransportchannel_unittest.cc +++ b/talk/p2p/base/p2ptransportchannel_unittest.cc @@ -25,6 +25,7 @@ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "talk/base/dscp.h" #include "talk/base/fakenetwork.h" #include "talk/base/firewallsocketserver.h" #include "talk/base/gunit.h" @@ -620,7 +621,7 @@ class P2PTransportChannelTestBase : public testing::Test, } int SendData(cricket::TransportChannel* channel, const char* data, size_t len) { - return channel->SendPacket(data, len, 0); + return channel->SendPacket(data, len, talk_base::DSCP_NO_CHANGE, 0); } bool CheckDataOnChannel(cricket::TransportChannel* channel, const char* data, int len) { @@ -1268,6 +1269,149 @@ TEST_F(P2PTransportChannelTest, TestTcpConnectionsFromActiveToPassive) { DestroyChannels(); } +TEST_F(P2PTransportChannelTest, TestBundleAllocatorToBundleAllocator) { + AddAddress(0, kPublicAddrs[0]); + AddAddress(1, kPublicAddrs[1]); + SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE); + SetAllocatorFlags(1, cricket::PORTALLOCATOR_ENABLE_BUNDLE); + + CreateChannels(2); + + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && + ep1_ch1()->writable() && + ep2_ch1()->readable() && + ep2_ch1()->writable(), + 1000); + EXPECT_TRUE(ep1_ch1()->best_connection() && + ep2_ch1()->best_connection()); + + EXPECT_FALSE(ep1_ch2()->readable()); + EXPECT_FALSE(ep1_ch2()->writable()); + EXPECT_FALSE(ep2_ch2()->readable()); + EXPECT_FALSE(ep2_ch2()->writable()); + + TestSendRecv(1); // Only 1 channel is writable per Endpoint. + DestroyChannels(); +} + +TEST_F(P2PTransportChannelTest, TestBundleAllocatorToNonBundleAllocator) { + AddAddress(0, kPublicAddrs[0]); + AddAddress(1, kPublicAddrs[1]); + // Enable BUNDLE flag at one side. + SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE); + + CreateChannels(2); + + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && + ep1_ch1()->writable() && + ep2_ch1()->readable() && + ep2_ch1()->writable(), + 1000); + EXPECT_TRUE_WAIT(ep1_ch2()->readable() && + ep1_ch2()->writable() && + ep2_ch2()->readable() && + ep2_ch2()->writable(), + 1000); + + EXPECT_TRUE(ep1_ch1()->best_connection() && + ep2_ch1()->best_connection()); + EXPECT_TRUE(ep1_ch2()->best_connection() && + ep2_ch2()->best_connection()); + + TestSendRecv(2); + DestroyChannels(); +} + +TEST_F(P2PTransportChannelTest, TestIceRoleConflictWithoutBundle) { + AddAddress(0, kPublicAddrs[0]); + AddAddress(1, kPublicAddrs[1]); + TestSignalRoleConflict(); +} + +TEST_F(P2PTransportChannelTest, TestIceRoleConflictWithBundle) { + AddAddress(0, kPublicAddrs[0]); + AddAddress(1, kPublicAddrs[1]); + SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE); + SetAllocatorFlags(1, cricket::PORTALLOCATOR_ENABLE_BUNDLE); + TestSignalRoleConflict(); +} + +// Tests that the ice configs (protocol, tiebreaker and role) can be passed +// down to ports. +TEST_F(P2PTransportChannelTest, TestIceConfigWillPassDownToPort) { + AddAddress(0, kPublicAddrs[0]); + AddAddress(1, kPublicAddrs[1]); + + SetIceRole(0, cricket::ICEROLE_CONTROLLING); + SetIceProtocol(0, cricket::ICEPROTO_GOOGLE); + SetIceTiebreaker(0, kTiebreaker1); + SetIceRole(1, cricket::ICEROLE_CONTROLLING); + SetIceProtocol(1, cricket::ICEPROTO_RFC5245); + SetIceTiebreaker(1, kTiebreaker2); + + CreateChannels(1); + + EXPECT_EQ_WAIT(2u, ep1_ch1()->ports().size(), 1000); + + const std::vector ports_before = ep1_ch1()->ports(); + for (size_t i = 0; i < ports_before.size(); ++i) { + EXPECT_EQ(cricket::ICEROLE_CONTROLLING, ports_before[i]->GetIceRole()); + EXPECT_EQ(cricket::ICEPROTO_GOOGLE, ports_before[i]->IceProtocol()); + EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker()); + } + + ep1_ch1()->SetIceRole(cricket::ICEROLE_CONTROLLED); + ep1_ch1()->SetIceProtocolType(cricket::ICEPROTO_RFC5245); + ep1_ch1()->SetIceTiebreaker(kTiebreaker2); + + const std::vector ports_after = ep1_ch1()->ports(); + for (size_t i = 0; i < ports_after.size(); ++i) { + EXPECT_EQ(cricket::ICEROLE_CONTROLLED, ports_before[i]->GetIceRole()); + EXPECT_EQ(cricket::ICEPROTO_RFC5245, ports_before[i]->IceProtocol()); + // SetIceTiebreaker after Connect() has been called will fail. So expect the + // original value. + EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker()); + } + + EXPECT_TRUE_WAIT(ep1_ch1()->readable() && + ep1_ch1()->writable() && + ep2_ch1()->readable() && + ep2_ch1()->writable(), + 1000); + + EXPECT_TRUE(ep1_ch1()->best_connection() && + ep2_ch1()->best_connection()); + + TestSendRecv(1); +} + +// Verify that we can set DSCP value and retrieve properly from P2PTC. +TEST_F(P2PTransportChannelTest, TestDefaultDscpValue) { + AddAddress(0, kPublicAddrs[0]); + AddAddress(1, kPublicAddrs[1]); + + CreateChannels(1); + EXPECT_EQ(talk_base::DSCP_NO_CHANGE, + GetEndpoint(0)->cd1_.ch_->DefaultDscpValue()); + EXPECT_EQ(talk_base::DSCP_NO_CHANGE, + GetEndpoint(1)->cd1_.ch_->DefaultDscpValue()); + GetEndpoint(0)->cd1_.ch_->SetOption( + talk_base::Socket::OPT_DSCP, talk_base::DSCP_CS6); + GetEndpoint(1)->cd1_.ch_->SetOption( + talk_base::Socket::OPT_DSCP, talk_base::DSCP_CS6); + EXPECT_EQ(talk_base::DSCP_CS6, + GetEndpoint(0)->cd1_.ch_->DefaultDscpValue()); + EXPECT_EQ(talk_base::DSCP_CS6, + GetEndpoint(1)->cd1_.ch_->DefaultDscpValue()); + GetEndpoint(0)->cd1_.ch_->SetOption( + talk_base::Socket::OPT_DSCP, talk_base::DSCP_AF41); + GetEndpoint(1)->cd1_.ch_->SetOption( + talk_base::Socket::OPT_DSCP, talk_base::DSCP_AF41); + EXPECT_EQ(talk_base::DSCP_AF41, + GetEndpoint(0)->cd1_.ch_->DefaultDscpValue()); + EXPECT_EQ(talk_base::DSCP_AF41, + GetEndpoint(1)->cd1_.ch_->DefaultDscpValue()); +} // Test what happens when we have 2 users behind the same NAT. This can lead // to interesting behavior because the STUN server will only give out the @@ -1390,120 +1534,3 @@ TEST_F(P2PTransportChannelMultihomedTest, TestDrain) { DestroyChannels(); } - -TEST_F(P2PTransportChannelTest, TestBundleAllocatorToBundleAllocator) { - AddAddress(0, kPublicAddrs[0]); - AddAddress(1, kPublicAddrs[1]); - SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE); - SetAllocatorFlags(1, cricket::PORTALLOCATOR_ENABLE_BUNDLE); - - CreateChannels(2); - - EXPECT_TRUE_WAIT(ep1_ch1()->readable() && - ep1_ch1()->writable() && - ep2_ch1()->readable() && - ep2_ch1()->writable(), - 1000); - EXPECT_TRUE(ep1_ch1()->best_connection() && - ep2_ch1()->best_connection()); - - EXPECT_FALSE(ep1_ch2()->readable()); - EXPECT_FALSE(ep1_ch2()->writable()); - EXPECT_FALSE(ep2_ch2()->readable()); - EXPECT_FALSE(ep2_ch2()->writable()); - - TestSendRecv(1); // Only 1 channel is writable per Endpoint. - DestroyChannels(); -} - -TEST_F(P2PTransportChannelTest, TestBundleAllocatorToNonBundleAllocator) { - AddAddress(0, kPublicAddrs[0]); - AddAddress(1, kPublicAddrs[1]); - // Enable BUNDLE flag at one side. - SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE); - - CreateChannels(2); - - EXPECT_TRUE_WAIT(ep1_ch1()->readable() && - ep1_ch1()->writable() && - ep2_ch1()->readable() && - ep2_ch1()->writable(), - 1000); - EXPECT_TRUE_WAIT(ep1_ch2()->readable() && - ep1_ch2()->writable() && - ep2_ch2()->readable() && - ep2_ch2()->writable(), - 1000); - - EXPECT_TRUE(ep1_ch1()->best_connection() && - ep2_ch1()->best_connection()); - EXPECT_TRUE(ep1_ch2()->best_connection() && - ep2_ch2()->best_connection()); - - TestSendRecv(2); - DestroyChannels(); -} - -TEST_F(P2PTransportChannelTest, TestIceRoleConflictWithoutBundle) { - AddAddress(0, kPublicAddrs[0]); - AddAddress(1, kPublicAddrs[1]); - TestSignalRoleConflict(); -} - -TEST_F(P2PTransportChannelTest, TestIceRoleConflictWithBundle) { - AddAddress(0, kPublicAddrs[0]); - AddAddress(1, kPublicAddrs[1]); - SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE); - SetAllocatorFlags(1, cricket::PORTALLOCATOR_ENABLE_BUNDLE); - TestSignalRoleConflict(); -} - -// Tests that the ice configs (protocol, tiebreaker and role) can be passed -// down to ports. -TEST_F(P2PTransportChannelTest, TestIceConfigWillPassDownToPort) { - AddAddress(0, kPublicAddrs[0]); - AddAddress(1, kPublicAddrs[1]); - - SetIceRole(0, cricket::ICEROLE_CONTROLLING); - SetIceProtocol(0, cricket::ICEPROTO_GOOGLE); - SetIceTiebreaker(0, kTiebreaker1); - SetIceRole(1, cricket::ICEROLE_CONTROLLING); - SetIceProtocol(1, cricket::ICEPROTO_RFC5245); - SetIceTiebreaker(1, kTiebreaker2); - - CreateChannels(1); - - EXPECT_EQ_WAIT(2u, ep1_ch1()->ports().size(), 1000); - - const std::vector ports_before = ep1_ch1()->ports(); - for (size_t i = 0; i < ports_before.size(); ++i) { - EXPECT_EQ(cricket::ICEROLE_CONTROLLING, ports_before[i]->GetIceRole()); - EXPECT_EQ(cricket::ICEPROTO_GOOGLE, ports_before[i]->IceProtocol()); - EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker()); - } - - ep1_ch1()->SetIceRole(cricket::ICEROLE_CONTROLLED); - ep1_ch1()->SetIceProtocolType(cricket::ICEPROTO_RFC5245); - ep1_ch1()->SetIceTiebreaker(kTiebreaker2); - - const std::vector ports_after = ep1_ch1()->ports(); - for (size_t i = 0; i < ports_after.size(); ++i) { - EXPECT_EQ(cricket::ICEROLE_CONTROLLED, ports_before[i]->GetIceRole()); - EXPECT_EQ(cricket::ICEPROTO_RFC5245, ports_before[i]->IceProtocol()); - // SetIceTiebreaker after Connect() has been called will fail. So expect the - // original value. - EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker()); - } - - EXPECT_TRUE_WAIT(ep1_ch1()->readable() && - ep1_ch1()->writable() && - ep2_ch1()->readable() && - ep2_ch1()->writable(), - 1000); - - EXPECT_TRUE(ep1_ch1()->best_connection() && - ep2_ch1()->best_connection()); - - TestSendRecv(1); -} - diff --git a/talk/p2p/base/port.cc b/talk/p2p/base/port.cc index 89f033f81..6e688dace 100644 --- a/talk/p2p/base/port.cc +++ b/talk/p2p/base/port.cc @@ -112,7 +112,7 @@ const int RTT_RATIO = 3; // 3 : 1 // The delay before we begin checking if this port is useless. const int kPortTimeoutDelay = 30 * 1000; // 30 seconds -const uint32 MSG_CHECKTIMEOUT = 1; +// Used by the Connection. const uint32 MSG_DELETE = 1; } @@ -181,7 +181,8 @@ Port::Port(talk_base::Thread* thread, talk_base::Network* network, ice_protocol_(ICEPROTO_GOOGLE), ice_role_(ICEROLE_UNKNOWN), tiebreaker_(0), - shared_socket_(true) { + shared_socket_(true), + default_dscp_(talk_base::DSCP_NO_CHANGE) { Construct(); } @@ -207,7 +208,8 @@ Port::Port(talk_base::Thread* thread, const std::string& type, ice_protocol_(ICEPROTO_GOOGLE), ice_role_(ICEROLE_UNKNOWN), tiebreaker_(0), - shared_socket_(false) { + shared_socket_(false), + default_dscp_(talk_base::DSCP_NO_CHANGE) { ASSERT(factory_ != NULL); Construct(); } @@ -606,7 +608,7 @@ void Port::SendBindingResponse(StunMessage* request, // Send the response message. talk_base::ByteBuffer buf; response.Write(&buf); - if (SendTo(buf.Data(), buf.Length(), addr, false) < 0) { + if (SendTo(buf.Data(), buf.Length(), addr, DefaultDscpValue(), false) < 0) { LOG_J(LS_ERROR, this) << "Failed to send STUN ping response to " << addr.ToSensitiveString(); } @@ -660,7 +662,7 @@ void Port::SendBindingErrorResponse(StunMessage* request, // Send the response message. talk_base::ByteBuffer buf; response.Write(&buf); - SendTo(buf.Data(), buf.Length(), addr, false); + SendTo(buf.Data(), buf.Length(), addr, DefaultDscpValue(), false); LOG_J(LS_INFO, this) << "Sending STUN binding error: reason=" << reason << " to " << addr.ToSensitiveString(); } @@ -916,7 +918,8 @@ void Connection::set_use_candidate_attr(bool enable) { void Connection::OnSendStunPacket(const void* data, size_t size, StunRequest* req) { - if (port_->SendTo(data, size, remote_candidate_.address(), false) < 0) { + if (port_->SendTo(data, size, remote_candidate_.address(), + port_->DefaultDscpValue(), false) < 0) { LOG_J(LS_WARNING, this) << "Failed to send STUN ping " << req->id(); } } @@ -1389,12 +1392,13 @@ ProxyConnection::ProxyConnection(Port* port, size_t index, : Connection(port, index, candidate), error_(0) { } -int ProxyConnection::Send(const void* data, size_t size) { +int ProxyConnection::Send(const void* data, size_t size, + talk_base::DiffServCodePoint dscp) { if (write_state_ == STATE_WRITE_INIT || write_state_ == STATE_WRITE_TIMEOUT) { error_ = EWOULDBLOCK; return SOCKET_ERROR; } - int sent = port_->SendTo(data, size, remote_candidate_.address(), true); + int sent = port_->SendTo(data, size, remote_candidate_.address(), dscp, true); if (sent <= 0) { ASSERT(sent < 0); error_ = port_->GetError(); diff --git a/talk/p2p/base/port.h b/talk/p2p/base/port.h index f533f627b..7b89e5546 100644 --- a/talk/p2p/base/port.h +++ b/talk/p2p/base/port.h @@ -304,7 +304,17 @@ class Port : public PortInterface, public talk_base::MessageHandler, // Returns if Google ICE protocol is used. bool IsGoogleIce() const; + // Returns default DSCP value. + talk_base::DiffServCodePoint DefaultDscpValue() const { + return default_dscp_; + } + protected: + enum { + MSG_CHECKTIMEOUT = 0, + MSG_FIRST_AVAILABLE + }; + void set_type(const std::string& type) { type_ = type; } // Fills in the local address of the port. void AddAddress(const talk_base::SocketAddress& address, @@ -334,6 +344,11 @@ class Port : public PortInterface, public talk_base::MessageHandler, // Checks if the address in addr is compatible with the port's ip. bool IsCompatibleAddress(const talk_base::SocketAddress& addr); + // Default DSCP value for this port. Set by TransportChannel. + void SetDefaultDscpValue(talk_base::DiffServCodePoint dscp) { + default_dscp_ = dscp; + } + private: void Construct(); // Called when one of our connections deletes itself. @@ -372,7 +387,9 @@ class Port : public PortInterface, public talk_base::MessageHandler, IceRole ice_role_; uint64 tiebreaker_; bool shared_socket_; - + // DSCP value for ICE/STUN messages. Set by the P2PTransportChannel after + // port becomes ready. + talk_base::DiffServCodePoint default_dscp_; // Information to use when going through a proxy. std::string user_agent_; talk_base::ProxyInfo proxy_; @@ -447,7 +464,8 @@ class Connection : public talk_base::MessageHandler, // The connection can send and receive packets asynchronously. This matches // the interface of AsyncPacketSocket, which may use UDP or TCP under the // covers. - virtual int Send(const void* data, size_t size) = 0; + virtual int Send(const void* data, size_t size, + talk_base::DiffServCodePoint dscp) = 0; // Error if Send() returns < 0 virtual int GetError() = 0; @@ -576,7 +594,8 @@ class ProxyConnection : public Connection { public: ProxyConnection(Port* port, size_t index, const Candidate& candidate); - virtual int Send(const void* data, size_t size); + virtual int Send(const void* data, size_t size, + talk_base::DiffServCodePoint dscp); virtual int GetError() { return error_; } private: diff --git a/talk/p2p/base/port_unittest.cc b/talk/p2p/base/port_unittest.cc index cecefdaee..d3e02ac9f 100644 --- a/talk/p2p/base/port_unittest.cc +++ b/talk/p2p/base/port_unittest.cc @@ -172,7 +172,7 @@ class TestPort : public Port { } virtual int SendTo( const void* data, size_t size, const talk_base::SocketAddress& addr, - bool payload) { + talk_base::DiffServCodePoint dscp, bool payload) { if (!payload) { IceMessage* msg = new IceMessage; ByteBuffer* buf = new ByteBuffer(static_cast(data), size); @@ -787,10 +787,12 @@ class FakeAsyncPacketSocket : public AsyncPacketSocket { } // Send a packet. - virtual int Send(const void *pv, size_t cb) { + virtual int Send(const void *pv, size_t cb, + talk_base::DiffServCodePoint dscp) { return static_cast(cb); } - virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) { + virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr, + talk_base::DiffServCodePoint dscp) { return static_cast(cb); } virtual int Close() { @@ -1224,6 +1226,26 @@ TEST_F(PortTest, TestSkipCrossFamilyUdp) { TestCrossFamilyPorts(SOCK_DGRAM); } +// This test verifies DSCP value set through SetOption interface can be +// get through DefaultDscpValue. +TEST_F(PortTest, TestDefaultDscpValue) { + talk_base::scoped_ptr udpport(CreateUdpPort(kLocalAddr1)); + udpport->SetOption(talk_base::Socket::OPT_DSCP, talk_base::DSCP_CS6); + EXPECT_EQ(talk_base::DSCP_CS6, udpport->DefaultDscpValue()); + talk_base::scoped_ptr tcpport(CreateTcpPort(kLocalAddr1)); + tcpport->SetOption(talk_base::Socket::OPT_DSCP, talk_base::DSCP_AF31); + EXPECT_EQ(talk_base::DSCP_AF31, tcpport->DefaultDscpValue()); + talk_base::scoped_ptr stunport( + CreateStunPort(kLocalAddr1, nat_socket_factory1())); + stunport->SetOption(talk_base::Socket::OPT_DSCP, talk_base::DSCP_AF41); + EXPECT_EQ(talk_base::DSCP_AF41, stunport->DefaultDscpValue()); + talk_base::scoped_ptr turnport(CreateTurnPort( + kLocalAddr1, nat_socket_factory1(), PROTO_UDP, PROTO_UDP)); + turnport->SetOption(talk_base::Socket::OPT_DSCP, talk_base::DSCP_CS7); + EXPECT_EQ(talk_base::DSCP_CS7, turnport->DefaultDscpValue()); + // TODO(mallinath) - Test DSCP through GetOption. +} + // Test sending STUN messages in GICE format. TEST_F(PortTest, TestSendStunMessageAsGice) { talk_base::scoped_ptr lport( @@ -2127,14 +2149,16 @@ TEST_F(PortTest, TestWritableState) { // Data should be unsendable until the connection is accepted. char data[] = "abcd"; int data_size = ARRAY_SIZE(data); - EXPECT_EQ(SOCKET_ERROR, ch1.conn()->Send(data, data_size)); + EXPECT_EQ(SOCKET_ERROR, + ch1.conn()->Send(data, data_size, talk_base::DSCP_NO_CHANGE)); // Accept the connection to return the binding response, transition to // writable, and allow data to be sent. ch2.AcceptConnection(); EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, ch1.conn()->write_state(), kTimeout); - EXPECT_EQ(data_size, ch1.conn()->Send(data, data_size)); + EXPECT_EQ(data_size, + ch1.conn()->Send(data, data_size, talk_base::DSCP_NO_CHANGE)); // Ask the connection to update state as if enough time has passed to lose // full writability and 5 pings went unresponded to. We'll accomplish the @@ -2147,7 +2171,8 @@ TEST_F(PortTest, TestWritableState) { EXPECT_EQ(Connection::STATE_WRITE_UNRELIABLE, ch1.conn()->write_state()); // Data should be able to be sent in this state. - EXPECT_EQ(data_size, ch1.conn()->Send(data, data_size)); + EXPECT_EQ(data_size, + ch1.conn()->Send(data, data_size, talk_base::DSCP_NO_CHANGE)); // And now allow the other side to process the pings and send binding // responses. @@ -2164,7 +2189,8 @@ TEST_F(PortTest, TestWritableState) { EXPECT_EQ(Connection::STATE_WRITE_TIMEOUT, ch1.conn()->write_state()); // Now that the connection has completely timed out, data send should fail. - EXPECT_EQ(SOCKET_ERROR, ch1.conn()->Send(data, data_size)); + EXPECT_EQ(SOCKET_ERROR, + ch1.conn()->Send(data, data_size, talk_base::DSCP_NO_CHANGE)); ch1.Stop(); ch2.Stop(); diff --git a/talk/p2p/base/portinterface.h b/talk/p2p/base/portinterface.h index b956f9abe..6ea63466c 100644 --- a/talk/p2p/base/portinterface.h +++ b/talk/p2p/base/portinterface.h @@ -30,6 +30,7 @@ #include +#include "talk/base/dscp.h" #include "talk/base/socketaddress.h" #include "talk/p2p/base/transport.h" @@ -90,16 +91,16 @@ class PortInterface { // Functions on the underlying socket(s). virtual int SetOption(talk_base::Socket::Option opt, int value) = 0; - virtual int GetError() = 0; - virtual int GetOption(talk_base::Socket::Option opt, int* value) = 0; + virtual int GetError() = 0; virtual const std::vector& Candidates() const = 0; // Sends the given packet to the given address, provided that the address is // that of a connection or an address that has sent to us already. virtual int SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr, bool payload) = 0; + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, bool payload) = 0; // Indicates that we received a successful STUN binding request from an // address that doesn't correspond to any current connection. To turn this diff --git a/talk/p2p/base/portproxy.cc b/talk/p2p/base/portproxy.cc index 2c6119aa5..eae39f161 100644 --- a/talk/p2p/base/portproxy.cc +++ b/talk/p2p/base/portproxy.cc @@ -97,9 +97,10 @@ Connection* PortProxy::CreateConnection(const Candidate& remote_candidate, int PortProxy::SendTo(const void* data, size_t size, const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, bool payload) { ASSERT(impl_ != NULL); - return impl_->SendTo(data, size, addr, payload); + return impl_->SendTo(data, size, addr, dscp, payload); } int PortProxy::SetOption(talk_base::Socket::Option opt, @@ -114,7 +115,6 @@ int PortProxy::GetOption(talk_base::Socket::Option opt, return impl_->GetOption(opt, value); } - int PortProxy::GetError() { ASSERT(impl_ != NULL); return impl_->GetError(); diff --git a/talk/p2p/base/portproxy.h b/talk/p2p/base/portproxy.h index 25808ea36..da326646d 100644 --- a/talk/p2p/base/portproxy.h +++ b/talk/p2p/base/portproxy.h @@ -68,7 +68,9 @@ class PortProxy : public PortInterface, public sigslot::has_slots<> { const talk_base::SocketAddress& remote_addr); virtual int SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr, bool payload); + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, + bool payload); virtual int SetOption(talk_base::Socket::Option opt, int value); virtual int GetOption(talk_base::Socket::Option opt, int* value); virtual int GetError(); diff --git a/talk/p2p/base/rawtransportchannel.cc b/talk/p2p/base/rawtransportchannel.cc index 54adab13c..ec225029b 100644 --- a/talk/p2p/base/rawtransportchannel.cc +++ b/talk/p2p/base/rawtransportchannel.cc @@ -74,14 +74,16 @@ RawTransportChannel::~RawTransportChannel() { delete allocator_session_; } -int RawTransportChannel::SendPacket(const char *data, size_t size, int flags) { +int RawTransportChannel::SendPacket(const char *data, size_t size, + talk_base::DiffServCodePoint dscp, + int flags) { if (port_ == NULL) return -1; if (remote_address_.IsNil()) return -1; if (flags != 0) return -1; - return port_->SendTo(data, size, remote_address_, true); + return port_->SendTo(data, size, remote_address_, dscp, true); } int RawTransportChannel::SetOption(talk_base::Socket::Option opt, int value) { diff --git a/talk/p2p/base/rawtransportchannel.h b/talk/p2p/base/rawtransportchannel.h index 4abea8315..2aac2b5ed 100644 --- a/talk/p2p/base/rawtransportchannel.h +++ b/talk/p2p/base/rawtransportchannel.h @@ -64,7 +64,8 @@ class RawTransportChannel : public TransportChannelImpl, virtual ~RawTransportChannel(); // Implementation of normal channel packet sending. - virtual int SendPacket(const char *data, size_t len, int flags); + virtual int SendPacket(const char *data, size_t len, + talk_base::DiffServCodePoint dscp, int flags); virtual int SetOption(talk_base::Socket::Option opt, int value); virtual int GetError(); diff --git a/talk/p2p/base/relayport.cc b/talk/p2p/base/relayport.cc index 0cd40e5ae..ff8c07c55 100644 --- a/talk/p2p/base/relayport.cc +++ b/talk/p2p/base/relayport.cc @@ -67,7 +67,7 @@ class RelayConnection : public sigslot::has_slots<> { bool CheckResponse(StunMessage* msg); // Sends data to the relay server. - int Send(const void* pv, size_t cb); + int Send(const void* pv, size_t cb, talk_base::DiffServCodePoint dscp); // Sends a STUN allocate request message to the relay server. void SendAllocateRequest(RelayEntry* entry, int delay); @@ -123,7 +123,8 @@ class RelayEntry : public talk_base::MessageHandler, // Sends a packet to the given destination address using the socket of this // entry. This will wrap the packet in STUN if necessary. int SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr); + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp); // Schedules a keep-alive allocate request. void ScheduleKeepAlive(); @@ -163,7 +164,8 @@ class RelayEntry : public talk_base::MessageHandler, // Sends the given data on the socket to the server with no wrapping. This // returns the number of bytes written or -1 if an error occurred. - int SendPacket(const void* data, size_t size); + int SendPacket(const void* data, size_t size, + talk_base::DiffServCodePoint dscp); }; // Handles an allocate request for a particular RelayEntry. @@ -300,7 +302,9 @@ Connection* RelayPort::CreateConnection(const Candidate& address, } int RelayPort::SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr, bool payload) { + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, + bool payload) { // Try to find an entry for this specific address. Note that the first entry // created was not given an address initially, so it can be set to the first // address that comes along. @@ -341,7 +345,7 @@ int RelayPort::SendTo(const void* data, size_t size, } // Send the actual contents to the server using the usual mechanism. - int sent = entry->SendTo(data, size, addr); + int sent = entry->SendTo(data, size, addr, dscp); if (sent <= 0) { ASSERT(sent < 0); error_ = entry->GetError(); @@ -354,6 +358,14 @@ int RelayPort::SendTo(const void* data, size_t size, int RelayPort::SetOption(talk_base::Socket::Option opt, int value) { int result = 0; + // DSCP option is not passed to the socket. + // TODO(mallinath) - After we have the support on socket, + // remove this specialization. + if (opt == talk_base::Socket::OPT_DSCP) { + SetDefaultDscpValue(static_cast(value)); + return result; + } + for (size_t i = 0; i < entries_.size(); ++i) { if (entries_[i]->SetSocketOption(opt, value) < 0) { result = -1; @@ -418,7 +430,9 @@ bool RelayConnection::CheckResponse(StunMessage* msg) { void RelayConnection::OnSendPacket(const void* data, size_t size, StunRequest* req) { - int sent = socket_->SendTo(data, size, GetAddress()); + // TODO(mallinath) Find a way to get DSCP value from Port. + int sent = socket_->SendTo( + data, size, GetAddress(), talk_base::DSCP_NO_CHANGE); if (sent <= 0) { LOG(LS_VERBOSE) << "OnSendPacket: failed sending to " << GetAddress() << std::strerror(socket_->GetError()); @@ -426,8 +440,9 @@ void RelayConnection::OnSendPacket(const void* data, size_t size, } } -int RelayConnection::Send(const void* pv, size_t cb) { - return socket_->SendTo(pv, cb, GetAddress()); +int RelayConnection::Send(const void* pv, size_t cb, + talk_base::DiffServCodePoint dscp) { + return socket_->SendTo(pv, cb, GetAddress(), dscp); } void RelayConnection::SendAllocateRequest(RelayEntry* entry, int delay) { @@ -546,11 +561,12 @@ void RelayEntry::OnConnect(const talk_base::SocketAddress& mapped_addr, } int RelayEntry::SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr) { + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp) { // If this connection is locked to the address given, then we can send the // packet with no wrapper. if (locked_ && (ext_addr_ == addr)) - return SendPacket(data, size); + return SendPacket(data, size, dscp); // Otherwise, we must wrap the given data in a STUN SEND request so that we // can communicate the destination address to the server. @@ -598,7 +614,7 @@ int RelayEntry::SendTo(const void* data, size_t size, talk_base::ByteBuffer buf; request.Write(&buf); - return SendPacket(buf.Data(), buf.Length()); + return SendPacket(buf.Data(), buf.Length(), dscp); } void RelayEntry::ScheduleKeepAlive() { @@ -744,12 +760,13 @@ void RelayEntry::OnReadyToSend(talk_base::AsyncPacketSocket* socket) { } } -int RelayEntry::SendPacket(const void* data, size_t size) { +int RelayEntry::SendPacket(const void* data, size_t size, + talk_base::DiffServCodePoint dscp) { int sent = 0; if (current_connection_) { // We are connected, no need to send packets anywere else than to // the current connection. - sent = current_connection_->Send(data, size); + sent = current_connection_->Send(data, size, dscp); } return sent; } diff --git a/talk/p2p/base/relayport.h b/talk/p2p/base/relayport.h index a2bfb7442..c15e7e010 100644 --- a/talk/p2p/base/relayport.h +++ b/talk/p2p/base/relayport.h @@ -92,7 +92,9 @@ class RelayPort : public Port { void SetReady(); virtual int SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr, bool payload); + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, + bool payload); // Dispatches the given packet to the port or connection as appropriate. void OnReadPacket(const char* data, size_t size, diff --git a/talk/p2p/base/relayserver.cc b/talk/p2p/base/relayserver.cc index 95aa08c39..c2cf472d3 100644 --- a/talk/p2p/base/relayserver.cc +++ b/talk/p2p/base/relayserver.cc @@ -51,7 +51,7 @@ static const uint32 kMessageAcceptConnection = 1; // Calls SendTo on the given socket and logs any bad results. void Send(talk_base::AsyncPacketSocket* socket, const char* bytes, size_t size, const talk_base::SocketAddress& addr) { - int result = socket->SendTo(bytes, size, addr); + int result = socket->SendTo(bytes, size, addr, talk_base::DSCP_NO_CHANGE); if (result < static_cast(size)) { LOG(LS_ERROR) << "SendTo wrote only " << result << " of " << size << " bytes"; diff --git a/talk/p2p/base/session_unittest.cc b/talk/p2p/base/session_unittest.cc index a7620669e..b64e73742 100644 --- a/talk/p2p/base/session_unittest.cc +++ b/talk/p2p/base/session_unittest.cc @@ -32,6 +32,7 @@ #include "talk/base/base64.h" #include "talk/base/common.h" +#include "talk/base/dscp.h" #include "talk/base/gunit.h" #include "talk/base/helpers.h" #include "talk/base/logging.h" @@ -830,7 +831,7 @@ struct ChannelHandler : sigslot::has_slots<> { std::string data_with_id(name); data_with_id += data; int result = channel->SendPacket(data_with_id.c_str(), data_with_id.size(), - 0); + talk_base::DSCP_NO_CHANGE, 0); EXPECT_EQ(static_cast(data_with_id.size()), result); } diff --git a/talk/p2p/base/stunport.cc b/talk/p2p/base/stunport.cc index 25d9e8df2..5e0e50029 100644 --- a/talk/p2p/base/stunport.cc +++ b/talk/p2p/base/stunport.cc @@ -216,8 +216,10 @@ Connection* UDPPort::CreateConnection(const Candidate& address, } int UDPPort::SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr, bool payload) { - int sent = socket_->SendTo(data, size, addr); + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, + bool payload) { + int sent = socket_->SendTo(data, size, addr, dscp); if (sent < 0) { error_ = socket_->GetError(); LOG_J(LS_ERROR, this) << "UDP send of " << size @@ -227,6 +229,12 @@ int UDPPort::SendTo(const void* data, size_t size, } int UDPPort::SetOption(talk_base::Socket::Option opt, int value) { + // TODO(mallinath) - After we have the support on socket, + // remove this specialization. + if (opt == talk_base::Socket::OPT_DSCP) { + SetDefaultDscpValue(static_cast(value)); + return 0; + } return socket_->SetOption(opt, value); } @@ -345,7 +353,7 @@ void UDPPort::SetResult(bool success) { // TODO: merge this with SendTo above. void UDPPort::OnSendPacket(const void* data, size_t size, StunRequest* req) { StunBindingRequest* sreq = static_cast(req); - if (socket_->SendTo(data, size, sreq->server_addr()) < 0) + if (socket_->SendTo(data, size, sreq->server_addr(), DefaultDscpValue()) < 0) PLOG(LERROR, socket_->GetError()) << "sendto"; } diff --git a/talk/p2p/base/stunport.h b/talk/p2p/base/stunport.h index 3f982d57b..7cfed4b7c 100644 --- a/talk/p2p/base/stunport.h +++ b/talk/p2p/base/stunport.h @@ -121,7 +121,9 @@ class UDPPort : public Port { bool Init(); virtual int SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr, bool payload); + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, + bool payload); void OnLocalAddressReady(talk_base::AsyncPacketSocket* socket, const talk_base::SocketAddress& address); diff --git a/talk/p2p/base/stunserver.cc b/talk/p2p/base/stunserver.cc index 05292e8e1..80719b4aa 100644 --- a/talk/p2p/base/stunserver.cc +++ b/talk/p2p/base/stunserver.cc @@ -102,7 +102,8 @@ void StunServer::SendResponse( const StunMessage& msg, const talk_base::SocketAddress& addr) { talk_base::ByteBuffer buf; msg.Write(&buf); - if (socket_->SendTo(buf.Data(), buf.Length(), addr) < 0) + if (socket_->SendTo( + buf.Data(), buf.Length(), addr, talk_base::DSCP_NO_CHANGE) < 0) LOG_ERR(LS_ERROR) << "sendto"; } diff --git a/talk/p2p/base/tcpport.cc b/talk/p2p/base/tcpport.cc index 5f4ccc48a..11334c622 100644 --- a/talk/p2p/base/tcpport.cc +++ b/talk/p2p/base/tcpport.cc @@ -134,7 +134,9 @@ void TCPPort::PrepareAddress() { } int TCPPort::SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr, bool payload) { + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, + bool payload) { talk_base::AsyncPacketSocket * socket = NULL; if (TCPConnection * conn = static_cast(GetConnection(addr))) { socket = conn->socket(); @@ -147,7 +149,7 @@ int TCPPort::SendTo(const void* data, size_t size, return -1; // TODO: Set error_ } - int sent = socket->Send(data, size); + int sent = socket->Send(data, size, dscp); if (sent < 0) { error_ = socket->GetError(); LOG_J(LS_ERROR, this) << "TCP send of " << size @@ -165,6 +167,14 @@ int TCPPort::GetOption(talk_base::Socket::Option opt, int* value) { } int TCPPort::SetOption(talk_base::Socket::Option opt, int value) { + // If we are setting DSCP value, pass value to base Port and return. + // TODO(mallinath) - After we have the support on socket, + // remove this specialization. + if (opt == talk_base::Socket::OPT_DSCP) { + SetDefaultDscpValue(static_cast(value)); + return 0; + } + if (socket_) { return socket_->SetOption(opt, value); } else { @@ -261,7 +271,8 @@ TCPConnection::~TCPConnection() { delete socket_; } -int TCPConnection::Send(const void* data, size_t size) { +int TCPConnection::Send(const void* data, size_t size, + talk_base::DiffServCodePoint dscp) { if (!socket_) { error_ = ENOTCONN; return SOCKET_ERROR; @@ -272,7 +283,7 @@ int TCPConnection::Send(const void* data, size_t size) { error_ = EWOULDBLOCK; return SOCKET_ERROR; } - int sent = socket_->Send(data, size); + int sent = socket_->Send(data, size, dscp); if (sent < 0) { error_ = socket_->GetError(); } else { diff --git a/talk/p2p/base/tcpport.h b/talk/p2p/base/tcpport.h index 813617666..599d3c66b 100644 --- a/talk/p2p/base/tcpport.h +++ b/talk/p2p/base/tcpport.h @@ -82,7 +82,9 @@ class TCPPort : public Port { // Handles sending using the local TCP socket. virtual int SendTo(const void* data, size_t size, - const talk_base::SocketAddress& addr, bool payload); + const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, + bool payload); // Accepts incoming TCP connection. void OnNewConnection(talk_base::AsyncPacketSocket* socket, @@ -124,7 +126,8 @@ class TCPConnection : public Connection { talk_base::AsyncPacketSocket* socket = 0); virtual ~TCPConnection(); - virtual int Send(const void* data, size_t size); + virtual int Send(const void* data, size_t size, + talk_base::DiffServCodePoint dscp); virtual int GetError(); talk_base::AsyncPacketSocket* socket() { return socket_; } diff --git a/talk/p2p/base/transportchannel.h b/talk/p2p/base/transportchannel.h index a5e41b9de..85fff7a9f 100644 --- a/talk/p2p/base/transportchannel.h +++ b/talk/p2p/base/transportchannel.h @@ -32,6 +32,7 @@ #include #include "talk/base/basictypes.h" +#include "talk/base/dscp.h" #include "talk/base/sigslot.h" #include "talk/base/socket.h" #include "talk/base/sslidentity.h" @@ -80,7 +81,9 @@ class TransportChannel : public sigslot::has_slots<> { // Attempts to send the given packet. The return value is < 0 on failure. // TODO: Remove the default argument once channel code is updated. - virtual int SendPacket(const char* data, size_t len, int flags = 0) = 0; + virtual int SendPacket(const char* data, size_t len, + talk_base::DiffServCodePoint dscp, + int flags = 0) = 0; // Sets a socket option on this channel. Note that not all options are // supported by all transport types. diff --git a/talk/p2p/base/transportchannelproxy.cc b/talk/p2p/base/transportchannelproxy.cc index 9f8462067..04b32ce64 100644 --- a/talk/p2p/base/transportchannelproxy.cc +++ b/talk/p2p/base/transportchannelproxy.cc @@ -93,13 +93,15 @@ void TransportChannelProxy::SetImplementation(TransportChannelImpl* impl) { worker_thread_->Post(this, MSG_UPDATESTATE); } -int TransportChannelProxy::SendPacket(const char* data, size_t len, int flags) { +int TransportChannelProxy::SendPacket(const char* data, size_t len, + talk_base::DiffServCodePoint dscp, + int flags) { ASSERT(talk_base::Thread::Current() == worker_thread_); // Fail if we don't have an impl yet. if (!impl_) { return -1; } - return impl_->SendPacket(data, len, flags); + return impl_->SendPacket(data, len, dscp, flags); } int TransportChannelProxy::SetOption(talk_base::Socket::Option opt, int value) { diff --git a/talk/p2p/base/transportchannelproxy.h b/talk/p2p/base/transportchannelproxy.h index ea2843d52..29f466341 100644 --- a/talk/p2p/base/transportchannelproxy.h +++ b/talk/p2p/base/transportchannelproxy.h @@ -63,7 +63,9 @@ class TransportChannelProxy : public TransportChannel, // Implementation of the TransportChannel interface. These simply forward to // the implementation. - virtual int SendPacket(const char* data, size_t len, int flags); + virtual int SendPacket(const char* data, size_t len, + talk_base::DiffServCodePoint dscp, + int flags); virtual int SetOption(talk_base::Socket::Option opt, int value); virtual int GetError(); virtual IceRole GetIceRole() const; diff --git a/talk/p2p/base/turnport.cc b/talk/p2p/base/turnport.cc index a302b7135..35e51fc2d 100644 --- a/talk/p2p/base/turnport.cc +++ b/talk/p2p/base/turnport.cc @@ -52,10 +52,6 @@ static const int TURN_PERMISSION_TIMEOUT = 5 * 60 * 1000; // 5 minutes static const size_t TURN_CHANNEL_HEADER_SIZE = 4U; -enum { - MSG_PORT_ERROR = 1 -}; - inline bool IsTurnChannelData(uint16 msg_type) { return ((msg_type & 0xC000) == 0x4000); // MSB are 0b01 } @@ -156,7 +152,8 @@ class TurnEntry : public sigslot::has_slots<> { void SendChannelBindRequest(int delay); // Sends a packet to the given destination address. // This will wrap the packet in STUN if necessary. - int Send(const void* data, size_t size, bool payload); + int Send(const void* data, size_t size, bool payload, + talk_base::DiffServCodePoint dscp); void OnCreatePermissionSuccess(); void OnCreatePermissionError(StunMessage* response, int code); @@ -296,6 +293,14 @@ Connection* TurnPort::CreateConnection(const Candidate& address, } int TurnPort::SetOption(talk_base::Socket::Option opt, int value) { + // DSCP option is not passed to the socket. + // TODO(mallinath) - After we have the support on socket, + // remove this specialization. + if (opt == talk_base::Socket::OPT_DSCP) { + SetDefaultDscpValue(static_cast(value)); + return 0; + } + if (!socket_) { // If socket is not created yet, these options will be applied during socket // creation. @@ -318,6 +323,7 @@ int TurnPort::GetError() { int TurnPort::SendTo(const void* data, size_t size, const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, bool payload) { // Try to find an entry for this specific address; we should have one. TurnEntry* entry = FindEntry(addr); @@ -332,7 +338,7 @@ int TurnPort::SendTo(const void* data, size_t size, } // Send the actual contents to the server using the usual mechanism. - int sent = entry->Send(data, size, payload); + int sent = entry->Send(data, size, payload, dscp); if (sent <= 0) { return SOCKET_ERROR; } @@ -406,7 +412,7 @@ void TurnPort::OnResolveResult(talk_base::SignalThread* signal_thread) { void TurnPort::OnSendStunPacket(const void* data, size_t size, StunRequest* request) { - if (Send(data, size) < 0) { + if (Send(data, size, DefaultDscpValue()) < 0) { LOG_J(LS_ERROR, this) << "Failed to send TURN message, err=" << socket_->GetError(); } @@ -431,15 +437,16 @@ void TurnPort::OnAllocateError() { // We will send SignalPortError asynchronously as this can be sent during // port initialization. This way it will not be blocking other port // creation. - thread()->Post(this, MSG_PORT_ERROR); + thread()->Post(this, MSG_ERROR); } void TurnPort::OnMessage(talk_base::Message* message) { - if (message->message_id == MSG_PORT_ERROR) { + if (message->message_id == MSG_ERROR) { SignalPortError(this); - } else { - Port::OnMessage(message); + return; } + + Port::OnMessage(message); } void TurnPort::OnAllocateRequestTimeout() { @@ -557,8 +564,9 @@ void TurnPort::AddRequestAuthInfo(StunMessage* msg) { VERIFY(msg->AddMessageIntegrity(hash())); } -int TurnPort::Send(const void* data, size_t len) { - return socket_->SendTo(data, len, server_address_.address); +int TurnPort::Send(const void* data, size_t len, + talk_base::DiffServCodePoint dscp) { + return socket_->SendTo(data, len, server_address_.address, dscp); } void TurnPort::UpdateHash() { @@ -890,7 +898,8 @@ void TurnEntry::SendChannelBindRequest(int delay) { port_, this, channel_id_, ext_addr_), delay); } -int TurnEntry::Send(const void* data, size_t size, bool payload) { +int TurnEntry::Send(const void* data, size_t size, bool payload, + talk_base::DiffServCodePoint dscp) { talk_base::ByteBuffer buf; if (state_ != STATE_BOUND) { // If we haven't bound the channel yet, we have to use a Send Indication. @@ -915,7 +924,7 @@ int TurnEntry::Send(const void* data, size_t size, bool payload) { buf.WriteUInt16(static_cast(size)); buf.WriteBytes(reinterpret_cast(data), size); } - return port_->Send(buf.Data(), buf.Length()); + return port_->Send(buf.Data(), buf.Length(), dscp); } void TurnEntry::OnCreatePermissionSuccess() { diff --git a/talk/p2p/base/turnport.h b/talk/p2p/base/turnport.h index fa23d5347..4462b0c8c 100644 --- a/talk/p2p/base/turnport.h +++ b/talk/p2p/base/turnport.h @@ -74,6 +74,7 @@ class TurnPort : public Port { const Candidate& c, PortInterface::CandidateOrigin origin); virtual int SendTo(const void* data, size_t size, const talk_base::SocketAddress& addr, + talk_base::DiffServCodePoint dscp, bool payload); virtual int SetOption(talk_base::Socket::Option opt, int value); virtual int GetOption(talk_base::Socket::Option opt, int* value); @@ -106,6 +107,8 @@ class TurnPort : public Port { const RelayCredentials& credentials); private: + enum { MSG_ERROR = MSG_FIRST_AVAILABLE }; + typedef std::list EntryList; typedef std::map SocketOptionsMap; @@ -138,7 +141,7 @@ class TurnPort : public Port { bool ScheduleRefresh(int lifetime); void SendRequest(StunRequest* request, int delay); - int Send(const void* data, size_t size); + int Send(const void* data, size_t size, talk_base::DiffServCodePoint dscp); void UpdateHash(); bool UpdateNonce(StunMessage* response); diff --git a/talk/p2p/base/turnport_unittest.cc b/talk/p2p/base/turnport_unittest.cc index 6304ce678..726175c5c 100644 --- a/talk/p2p/base/turnport_unittest.cc +++ b/talk/p2p/base/turnport_unittest.cc @@ -27,6 +27,7 @@ #include "talk/base/asynctcpsocket.h" #include "talk/base/buffer.h" +#include "talk/base/dscp.h" #include "talk/base/firewallsocketserver.h" #include "talk/base/logging.h" #include "talk/base/gunit.h" @@ -217,8 +218,8 @@ class TurnPortTest : public testing::Test, for (size_t j = 0; j < i + 1; ++j) { buf[j] = 0xFF - j; } - conn1->Send(buf, i + 1); - conn2->Send(buf, i + 1); + conn1->Send(buf, i + 1, talk_base::DSCP_NO_CHANGE); + conn2->Send(buf, i + 1, talk_base::DSCP_NO_CHANGE); main_->ProcessMessages(0); } diff --git a/talk/p2p/base/turnserver.cc b/talk/p2p/base/turnserver.cc index 8260f3dbe..17ecf3507 100644 --- a/talk/p2p/base/turnserver.cc +++ b/talk/p2p/base/turnserver.cc @@ -564,7 +564,8 @@ void TurnServer::SendStun(Connection* conn, StunMessage* msg) { void TurnServer::Send(Connection* conn, const talk_base::ByteBuffer& buf) { - conn->socket()->SendTo(buf.Data(), buf.Length(), conn->src()); + conn->socket()->SendTo(buf.Data(), buf.Length(), conn->src(), + talk_base::DSCP_NO_CHANGE); } void TurnServer::OnAllocationDestroyed(Allocation* allocation) { @@ -936,7 +937,7 @@ void TurnServer::Allocation::SendErrorResponse(const TurnMessage* req, int code, void TurnServer::Allocation::SendExternal(const void* data, size_t size, const talk_base::SocketAddress& peer) { - external_socket_->SendTo(data, size, peer); + external_socket_->SendTo(data, size, peer, talk_base::DSCP_NO_CHANGE); } void TurnServer::Allocation::OnMessage(talk_base::Message* msg) { diff --git a/talk/session/media/channel.cc b/talk/session/media/channel.cc index e60fa35c4..f6259e9a3 100644 --- a/talk/session/media/channel.cc +++ b/talk/session/media/channel.cc @@ -30,6 +30,7 @@ #include "talk/base/buffer.h" #include "talk/base/byteorder.h" #include "talk/base/common.h" +#include "talk/base/dscp.h" #include "talk/base/logging.h" #include "talk/media/base/rtputils.h" #include "talk/p2p/base/transportchannel.h" @@ -189,6 +190,7 @@ struct VideoStatsMessageData : public talk_base::MessageData { struct PacketMessageData : public talk_base::MessageData { talk_base::Buffer packet; + talk_base::DiffServCodePoint dscp; }; struct AudioRenderMessageData: public talk_base::MessageData { @@ -566,12 +568,14 @@ bool BaseChannel::IsReadyToSend() const { was_ever_writable(); } -bool BaseChannel::SendPacket(talk_base::Buffer* packet) { - return SendPacket(false, packet); +bool BaseChannel::SendPacket(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp) { + return SendPacket(false, packet, dscp); } -bool BaseChannel::SendRtcp(talk_base::Buffer* packet) { - return SendPacket(true, packet); +bool BaseChannel::SendRtcp(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp) { + return SendPacket(true, packet, dscp); } int BaseChannel::SetOption(SocketType type, talk_base::Socket::Option opt, @@ -635,7 +639,8 @@ bool BaseChannel::PacketIsRtcp(const TransportChannel* channel, rtcp_mux_filter_.DemuxRtcp(data, static_cast(len))); } -bool BaseChannel::SendPacket(bool rtcp, talk_base::Buffer* packet) { +bool BaseChannel::SendPacket(bool rtcp, talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp) { // Unless we're sending optimistically, we only allow packets through when we // are completely writable. if (!optimistic_data_send_ && !writable_) { @@ -654,6 +659,7 @@ bool BaseChannel::SendPacket(bool rtcp, talk_base::Buffer* packet) { int message_id = (!rtcp) ? MSG_RTPPACKET : MSG_RTCPPACKET; PacketMessageData* data = new PacketMessageData; packet->TransferTo(&data->packet); + data->dscp = dscp; worker_thread_->Post(this, message_id, data); return true; } @@ -731,7 +737,7 @@ bool BaseChannel::SendPacket(bool rtcp, talk_base::Buffer* packet) { } // Bon voyage. - int ret = channel->SendPacket(packet->data(), packet->length(), + int ret = channel->SendPacket(packet->data(), packet->length(), dscp, (secure() && secure_dtls()) ? PF_SRTP_BYPASS : 0); if (ret != static_cast(packet->length())) { if (channel->GetError() == EWOULDBLOCK) { @@ -1404,7 +1410,7 @@ void BaseChannel::OnMessage(talk_base::Message *pmsg) { case MSG_RTPPACKET: case MSG_RTCPPACKET: { PacketMessageData* data = static_cast(pmsg->pdata); - SendPacket(pmsg->message_id == MSG_RTCPPACKET, &data->packet); + SendPacket(pmsg->message_id == MSG_RTCPPACKET, &data->packet, data->dscp); delete data; // because it is Posted break; } diff --git a/talk/session/media/channel.h b/talk/session/media/channel.h index 6c17e1854..0d66be9a9 100644 --- a/talk/session/media/channel.h +++ b/talk/session/media/channel.h @@ -251,8 +251,10 @@ class BaseChannel void FlushRtcpMessages(); // NetworkInterface implementation, called by MediaEngine - virtual bool SendPacket(talk_base::Buffer* packet); - virtual bool SendRtcp(talk_base::Buffer* packet); + virtual bool SendPacket(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp); + virtual bool SendRtcp(talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp); virtual int SetOption(SocketType type, talk_base::Socket::Option o, int val); // From TransportChannel @@ -263,7 +265,8 @@ class BaseChannel bool PacketIsRtcp(const TransportChannel* channel, const char* data, size_t len); - bool SendPacket(bool rtcp, talk_base::Buffer* packet); + bool SendPacket(bool rtcp, talk_base::Buffer* packet, + talk_base::DiffServCodePoint dscp); virtual bool WantsPacket(bool rtcp, talk_base::Buffer* packet); void HandlePacket(bool rtcp, talk_base::Buffer* packet); diff --git a/talk/session/media/mediasession.cc b/talk/session/media/mediasession.cc index 1215008b0..856123082 100644 --- a/talk/session/media/mediasession.cc +++ b/talk/session/media/mediasession.cc @@ -38,6 +38,7 @@ #include "talk/base/stringutils.h" #include "talk/media/base/constants.h" #include "talk/media/base/cryptoparams.h" +#include "talk/media/sctp/sctpdataengine.h" #include "talk/p2p/base/constants.h" #include "talk/session/media/channelmanager.h" #include "talk/session/media/srtpfilter.h" diff --git a/talk/session/media/mediasession.h b/talk/session/media/mediasession.h index 327480466..5dfc765e7 100644 --- a/talk/session/media/mediasession.h +++ b/talk/session/media/mediasession.h @@ -83,10 +83,6 @@ extern const char kMediaProtocolDtlsSctp[]; // Options to control how session descriptions are generated. const int kAutoBandwidth = -1; const int kBufferedModeDisabled = 0; -// TODO(pthatcher): This is imposed by usrsctp lib. I have no idea -// why it is 9. Figure out why, and make it bigger, hopefully up to -// 2^16-1. -const uint32 kMaxSctpSid = 9; struct MediaSessionOptions { MediaSessionOptions() : diff --git a/talk/session/media/mediasessionclient.cc b/talk/session/media/mediasessionclient.cc index b54891e8c..246592c61 100644 --- a/talk/session/media/mediasessionclient.cc +++ b/talk/session/media/mediasessionclient.cc @@ -35,6 +35,7 @@ #include "talk/base/stringutils.h" #include "talk/media/base/cryptoparams.h" #include "talk/media/base/capturemanager.h" +#include "talk/media/sctp/sctpdataengine.h" #include "talk/p2p/base/constants.h" #include "talk/p2p/base/parsing.h" #include "talk/session/media/mediamessages.h" diff --git a/talk/session/tunnel/pseudotcpchannel.cc b/talk/session/tunnel/pseudotcpchannel.cc index 8b9a19f0b..92e9e0ea5 100644 --- a/talk/session/tunnel/pseudotcpchannel.cc +++ b/talk/session/tunnel/pseudotcpchannel.cc @@ -502,7 +502,7 @@ IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket( ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(tcp == tcp_); ASSERT(NULL != channel_); - int sent = channel_->SendPacket(buffer, len); + int sent = channel_->SendPacket(buffer, len, talk_base::DSCP_NO_CHANGE); if (sent > 0) { //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent"; return IPseudoTcpNotify::WR_SUCCESS;