Update libjingle to 53057474.

R=wu@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/2274004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@4818 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
mallinath@webrtc.org
2013-09-23 20:34:45 +00:00
parent e2af622edf
commit 1112c30e1e
66 changed files with 764 additions and 393 deletions

View File

@@ -65,7 +65,9 @@ AsyncStunTCPSocket::AsyncStunTCPSocket(
: talk_base::AsyncTCPSocketBase(socket, listen, kBufSize) {
}
int AsyncStunTCPSocket::Send(const void *pv, size_t cb) {
// TODO(mallinath) - Add support of setting DSCP code on AsyncSocket.
int AsyncStunTCPSocket::Send(const void *pv, size_t cb,
talk_base::DiffServCodePoint dscp) {
if (cb > kBufSize || cb < kPacketLenSize + kPacketLenOffset) {
SetError(EMSGSIZE);
return -1;

View File

@@ -47,7 +47,8 @@ class AsyncStunTCPSocket : public talk_base::AsyncTCPSocketBase {
AsyncStunTCPSocket(talk_base::AsyncSocket* socket, bool listen);
virtual ~AsyncStunTCPSocket() {}
virtual int Send(const void* pv, size_t cb);
virtual int Send(const void* pv, size_t cb,
talk_base::DiffServCodePoint dscp);
virtual void ProcessInput(char* data, size_t* len);
virtual void HandleIncomingConnection(talk_base::AsyncSocket* socket);

View File

@@ -121,7 +121,8 @@ class AsyncStunTCPSocketTest : public testing::Test,
}
bool Send(const void* data, size_t len) {
size_t ret = send_socket_->Send(reinterpret_cast<const char*>(data), len);
size_t ret = send_socket_->Send(
reinterpret_cast<const char*>(data), len, talk_base::DSCP_NO_CHANGE);
vss_->ProcessMessagesUntilIdle();
return (ret == len);
}

View File

@@ -29,6 +29,7 @@
#include "talk/p2p/base/dtlstransportchannel.h"
#include "talk/base/buffer.h"
#include "talk/base/dscp.h"
#include "talk/base/messagequeue.h"
#include "talk/base/stream.h"
#include "talk/base/sslstreamadapter.h"
@@ -69,7 +70,8 @@ talk_base::StreamResult StreamInterfaceChannel::Write(const void* data,
int* error) {
// Always succeeds, since this is an unreliable transport anyway.
// TODO: Should this block if channel_'s temporarily unwritable?
channel_->SendPacket(static_cast<const char*>(data), data_len);
channel_->SendPacket(
static_cast<const char*>(data), data_len, talk_base::DSCP_NO_CHANGE);
if (written) {
*written = data_len;
}
@@ -297,6 +299,7 @@ bool DtlsTransportChannelWrapper::GetSrtpCipher(std::string* cipher) {
// Called from upper layers to send a media packet.
int DtlsTransportChannelWrapper::SendPacket(const char* data, size_t size,
talk_base::DiffServCodePoint dscp,
int flags) {
int result = -1;
@@ -321,7 +324,7 @@ int DtlsTransportChannelWrapper::SendPacket(const char* data, size_t size,
break;
}
result = channel_->SendPacket(data, size);
result = channel_->SendPacket(data, size, dscp);
} else {
result = (dtls_->WriteAll(data, size, NULL, NULL) ==
talk_base::SR_SUCCESS) ? static_cast<int>(size) : -1;
@@ -329,7 +332,7 @@ int DtlsTransportChannelWrapper::SendPacket(const char* data, size_t size,
break;
// Not doing DTLS.
case STATE_NONE:
result = channel_->SendPacket(data, size);
result = channel_->SendPacket(data, size, dscp);
break;
case STATE_CLOSED: // Can't send anything when we're closed.

View File

@@ -135,7 +135,9 @@ class DtlsTransportChannelWrapper : public TransportChannelImpl {
virtual bool IsDtlsActive() const { return dtls_state_ != STATE_NONE; }
// Called to send a packet (via DTLS, if turned on).
virtual int SendPacket(const char* data, size_t size, int flags);
virtual int SendPacket(const char* data, size_t size,
talk_base::DiffServCodePoint dscp,
int flags);
// TransportChannel calls that we forward to the wrapped transport.
virtual int SetOption(talk_base::Socket::Option opt, int value) {

View File

@@ -29,6 +29,7 @@
#include <set>
#include "talk/base/common.h"
#include "talk/base/dscp.h"
#include "talk/base/gunit.h"
#include "talk/base/helpers.h"
#include "talk/base/scoped_ptr.h"
@@ -244,7 +245,8 @@ class DtlsTestClient : public sigslot::has_slots<> {
// Only set the bypass flag if we've activated DTLS.
int flags = (identity_.get() && srtp) ? cricket::PF_SRTP_BYPASS : 0;
int rv = channels_[channel]->SendPacket(packet.get(), size, flags);
int rv = channels_[channel]->SendPacket(
packet.get(), size, talk_base::DSCP_NO_CHANGE, flags);
ASSERT_GT(rv, 0);
ASSERT_EQ(size, static_cast<size_t>(rv));
++sent;

View File

@@ -169,7 +169,8 @@ class FakeTransportChannel : public TransportChannelImpl,
}
}
virtual int SendPacket(const char* data, size_t len, int flags) {
virtual int SendPacket(const char* data, size_t len,
talk_base::DiffServCodePoint dscp, int flags) {
if (state_ != STATE_CONNECTED) {
return -1;
}

View File

@@ -518,7 +518,20 @@ void P2PTransportChannel::OnUnknownAddress(
// request came from.
// There shouldn't be an existing connection with this remote address.
ASSERT(port->GetConnection(new_remote_candidate.address()) == NULL);
// When ports are muxed, this channel might get multiple unknown addres
// signals. In that case if the connection is already exists, we should
// simply ignore the signal othewise send server error.
if (port->GetConnection(new_remote_candidate.address()) && port_muxed) {
LOG(LS_INFO) << "Connection already exist for PeerReflexive candidate: "
<< new_remote_candidate.ToString();
return;
} else if (port->GetConnection(new_remote_candidate.address())) {
ASSERT(false);
port->SendBindingErrorResponse(stun_msg, address,
STUN_ERROR_SERVER_ERROR,
STUN_ERROR_REASON_SERVER_ERROR);
return;
}
Connection* connection = port->CreateConnection(
new_remote_candidate, cricket::PortInterface::ORIGIN_THIS_PORT);
@@ -773,7 +786,9 @@ int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) {
}
// Send data to the other side, using our best connection.
int P2PTransportChannel::SendPacket(const char *data, size_t len, int flags) {
int P2PTransportChannel::SendPacket(const char *data, size_t len,
talk_base::DiffServCodePoint dscp,
int flags) {
ASSERT(worker_thread_ == talk_base::Thread::Current());
if (flags != 0) {
error_ = EINVAL;
@@ -783,7 +798,7 @@ int P2PTransportChannel::SendPacket(const char *data, size_t len, int flags) {
error_ = EWOULDBLOCK;
return -1;
}
int sent = best_connection_->Send(data, len);
int sent = best_connection_->Send(data, len, dscp);
if (sent <= 0) {
ASSERT(sent < 0);
error_ = best_connection_->GetError();
@@ -823,6 +838,14 @@ bool P2PTransportChannel::GetStats(ConnectionInfos *infos) {
return true;
}
talk_base::DiffServCodePoint P2PTransportChannel::DefaultDscpValue() const {
OptionMap::const_iterator it = options_.find(talk_base::Socket::OPT_DSCP);
if (it == options_.end()) {
return talk_base::DSCP_NO_CHANGE;
}
return static_cast<talk_base::DiffServCodePoint> (it->second);
}
// Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending)
void P2PTransportChannel::Allocate() {
// Time for a new allocator, lets make sure we have a signalling channel

View File

@@ -90,7 +90,8 @@ class P2PTransportChannel : public TransportChannelImpl,
virtual void OnCandidate(const Candidate& candidate);
// From TransportChannel:
virtual int SendPacket(const char *data, size_t len, int flags);
virtual int SendPacket(const char *data, size_t len,
talk_base::DiffServCodePoint dscp, int flags);
virtual int SetOption(talk_base::Socket::Option opt, int value);
virtual int GetError() { return error_; }
virtual bool GetStats(std::vector<ConnectionInfo>* stats);
@@ -149,6 +150,9 @@ class P2PTransportChannel : public TransportChannelImpl,
return false;
}
// Helper method used only in unittest.
talk_base::DiffServCodePoint DefaultDscpValue() const;
private:
talk_base::Thread* thread() { return worker_thread_; }
PortAllocatorSession* allocator_session() {

View File

@@ -25,6 +25,7 @@
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "talk/base/dscp.h"
#include "talk/base/fakenetwork.h"
#include "talk/base/firewallsocketserver.h"
#include "talk/base/gunit.h"
@@ -620,7 +621,7 @@ class P2PTransportChannelTestBase : public testing::Test,
}
int SendData(cricket::TransportChannel* channel,
const char* data, size_t len) {
return channel->SendPacket(data, len, 0);
return channel->SendPacket(data, len, talk_base::DSCP_NO_CHANGE, 0);
}
bool CheckDataOnChannel(cricket::TransportChannel* channel,
const char* data, int len) {
@@ -1268,6 +1269,149 @@ TEST_F(P2PTransportChannelTest, TestTcpConnectionsFromActiveToPassive) {
DestroyChannels();
}
TEST_F(P2PTransportChannelTest, TestBundleAllocatorToBundleAllocator) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
SetAllocatorFlags(1, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
CreateChannels(2);
EXPECT_TRUE_WAIT(ep1_ch1()->readable() &&
ep1_ch1()->writable() &&
ep2_ch1()->readable() &&
ep2_ch1()->writable(),
1000);
EXPECT_TRUE(ep1_ch1()->best_connection() &&
ep2_ch1()->best_connection());
EXPECT_FALSE(ep1_ch2()->readable());
EXPECT_FALSE(ep1_ch2()->writable());
EXPECT_FALSE(ep2_ch2()->readable());
EXPECT_FALSE(ep2_ch2()->writable());
TestSendRecv(1); // Only 1 channel is writable per Endpoint.
DestroyChannels();
}
TEST_F(P2PTransportChannelTest, TestBundleAllocatorToNonBundleAllocator) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
// Enable BUNDLE flag at one side.
SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
CreateChannels(2);
EXPECT_TRUE_WAIT(ep1_ch1()->readable() &&
ep1_ch1()->writable() &&
ep2_ch1()->readable() &&
ep2_ch1()->writable(),
1000);
EXPECT_TRUE_WAIT(ep1_ch2()->readable() &&
ep1_ch2()->writable() &&
ep2_ch2()->readable() &&
ep2_ch2()->writable(),
1000);
EXPECT_TRUE(ep1_ch1()->best_connection() &&
ep2_ch1()->best_connection());
EXPECT_TRUE(ep1_ch2()->best_connection() &&
ep2_ch2()->best_connection());
TestSendRecv(2);
DestroyChannels();
}
TEST_F(P2PTransportChannelTest, TestIceRoleConflictWithoutBundle) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
TestSignalRoleConflict();
}
TEST_F(P2PTransportChannelTest, TestIceRoleConflictWithBundle) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
SetAllocatorFlags(1, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
TestSignalRoleConflict();
}
// Tests that the ice configs (protocol, tiebreaker and role) can be passed
// down to ports.
TEST_F(P2PTransportChannelTest, TestIceConfigWillPassDownToPort) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
SetIceRole(0, cricket::ICEROLE_CONTROLLING);
SetIceProtocol(0, cricket::ICEPROTO_GOOGLE);
SetIceTiebreaker(0, kTiebreaker1);
SetIceRole(1, cricket::ICEROLE_CONTROLLING);
SetIceProtocol(1, cricket::ICEPROTO_RFC5245);
SetIceTiebreaker(1, kTiebreaker2);
CreateChannels(1);
EXPECT_EQ_WAIT(2u, ep1_ch1()->ports().size(), 1000);
const std::vector<cricket::PortInterface *> ports_before = ep1_ch1()->ports();
for (size_t i = 0; i < ports_before.size(); ++i) {
EXPECT_EQ(cricket::ICEROLE_CONTROLLING, ports_before[i]->GetIceRole());
EXPECT_EQ(cricket::ICEPROTO_GOOGLE, ports_before[i]->IceProtocol());
EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker());
}
ep1_ch1()->SetIceRole(cricket::ICEROLE_CONTROLLED);
ep1_ch1()->SetIceProtocolType(cricket::ICEPROTO_RFC5245);
ep1_ch1()->SetIceTiebreaker(kTiebreaker2);
const std::vector<cricket::PortInterface *> ports_after = ep1_ch1()->ports();
for (size_t i = 0; i < ports_after.size(); ++i) {
EXPECT_EQ(cricket::ICEROLE_CONTROLLED, ports_before[i]->GetIceRole());
EXPECT_EQ(cricket::ICEPROTO_RFC5245, ports_before[i]->IceProtocol());
// SetIceTiebreaker after Connect() has been called will fail. So expect the
// original value.
EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker());
}
EXPECT_TRUE_WAIT(ep1_ch1()->readable() &&
ep1_ch1()->writable() &&
ep2_ch1()->readable() &&
ep2_ch1()->writable(),
1000);
EXPECT_TRUE(ep1_ch1()->best_connection() &&
ep2_ch1()->best_connection());
TestSendRecv(1);
}
// Verify that we can set DSCP value and retrieve properly from P2PTC.
TEST_F(P2PTransportChannelTest, TestDefaultDscpValue) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
CreateChannels(1);
EXPECT_EQ(talk_base::DSCP_NO_CHANGE,
GetEndpoint(0)->cd1_.ch_->DefaultDscpValue());
EXPECT_EQ(talk_base::DSCP_NO_CHANGE,
GetEndpoint(1)->cd1_.ch_->DefaultDscpValue());
GetEndpoint(0)->cd1_.ch_->SetOption(
talk_base::Socket::OPT_DSCP, talk_base::DSCP_CS6);
GetEndpoint(1)->cd1_.ch_->SetOption(
talk_base::Socket::OPT_DSCP, talk_base::DSCP_CS6);
EXPECT_EQ(talk_base::DSCP_CS6,
GetEndpoint(0)->cd1_.ch_->DefaultDscpValue());
EXPECT_EQ(talk_base::DSCP_CS6,
GetEndpoint(1)->cd1_.ch_->DefaultDscpValue());
GetEndpoint(0)->cd1_.ch_->SetOption(
talk_base::Socket::OPT_DSCP, talk_base::DSCP_AF41);
GetEndpoint(1)->cd1_.ch_->SetOption(
talk_base::Socket::OPT_DSCP, talk_base::DSCP_AF41);
EXPECT_EQ(talk_base::DSCP_AF41,
GetEndpoint(0)->cd1_.ch_->DefaultDscpValue());
EXPECT_EQ(talk_base::DSCP_AF41,
GetEndpoint(1)->cd1_.ch_->DefaultDscpValue());
}
// Test what happens when we have 2 users behind the same NAT. This can lead
// to interesting behavior because the STUN server will only give out the
@@ -1390,120 +1534,3 @@ TEST_F(P2PTransportChannelMultihomedTest, TestDrain) {
DestroyChannels();
}
TEST_F(P2PTransportChannelTest, TestBundleAllocatorToBundleAllocator) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
SetAllocatorFlags(1, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
CreateChannels(2);
EXPECT_TRUE_WAIT(ep1_ch1()->readable() &&
ep1_ch1()->writable() &&
ep2_ch1()->readable() &&
ep2_ch1()->writable(),
1000);
EXPECT_TRUE(ep1_ch1()->best_connection() &&
ep2_ch1()->best_connection());
EXPECT_FALSE(ep1_ch2()->readable());
EXPECT_FALSE(ep1_ch2()->writable());
EXPECT_FALSE(ep2_ch2()->readable());
EXPECT_FALSE(ep2_ch2()->writable());
TestSendRecv(1); // Only 1 channel is writable per Endpoint.
DestroyChannels();
}
TEST_F(P2PTransportChannelTest, TestBundleAllocatorToNonBundleAllocator) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
// Enable BUNDLE flag at one side.
SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
CreateChannels(2);
EXPECT_TRUE_WAIT(ep1_ch1()->readable() &&
ep1_ch1()->writable() &&
ep2_ch1()->readable() &&
ep2_ch1()->writable(),
1000);
EXPECT_TRUE_WAIT(ep1_ch2()->readable() &&
ep1_ch2()->writable() &&
ep2_ch2()->readable() &&
ep2_ch2()->writable(),
1000);
EXPECT_TRUE(ep1_ch1()->best_connection() &&
ep2_ch1()->best_connection());
EXPECT_TRUE(ep1_ch2()->best_connection() &&
ep2_ch2()->best_connection());
TestSendRecv(2);
DestroyChannels();
}
TEST_F(P2PTransportChannelTest, TestIceRoleConflictWithoutBundle) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
TestSignalRoleConflict();
}
TEST_F(P2PTransportChannelTest, TestIceRoleConflictWithBundle) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
SetAllocatorFlags(0, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
SetAllocatorFlags(1, cricket::PORTALLOCATOR_ENABLE_BUNDLE);
TestSignalRoleConflict();
}
// Tests that the ice configs (protocol, tiebreaker and role) can be passed
// down to ports.
TEST_F(P2PTransportChannelTest, TestIceConfigWillPassDownToPort) {
AddAddress(0, kPublicAddrs[0]);
AddAddress(1, kPublicAddrs[1]);
SetIceRole(0, cricket::ICEROLE_CONTROLLING);
SetIceProtocol(0, cricket::ICEPROTO_GOOGLE);
SetIceTiebreaker(0, kTiebreaker1);
SetIceRole(1, cricket::ICEROLE_CONTROLLING);
SetIceProtocol(1, cricket::ICEPROTO_RFC5245);
SetIceTiebreaker(1, kTiebreaker2);
CreateChannels(1);
EXPECT_EQ_WAIT(2u, ep1_ch1()->ports().size(), 1000);
const std::vector<cricket::PortInterface *> ports_before = ep1_ch1()->ports();
for (size_t i = 0; i < ports_before.size(); ++i) {
EXPECT_EQ(cricket::ICEROLE_CONTROLLING, ports_before[i]->GetIceRole());
EXPECT_EQ(cricket::ICEPROTO_GOOGLE, ports_before[i]->IceProtocol());
EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker());
}
ep1_ch1()->SetIceRole(cricket::ICEROLE_CONTROLLED);
ep1_ch1()->SetIceProtocolType(cricket::ICEPROTO_RFC5245);
ep1_ch1()->SetIceTiebreaker(kTiebreaker2);
const std::vector<cricket::PortInterface *> ports_after = ep1_ch1()->ports();
for (size_t i = 0; i < ports_after.size(); ++i) {
EXPECT_EQ(cricket::ICEROLE_CONTROLLED, ports_before[i]->GetIceRole());
EXPECT_EQ(cricket::ICEPROTO_RFC5245, ports_before[i]->IceProtocol());
// SetIceTiebreaker after Connect() has been called will fail. So expect the
// original value.
EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker());
}
EXPECT_TRUE_WAIT(ep1_ch1()->readable() &&
ep1_ch1()->writable() &&
ep2_ch1()->readable() &&
ep2_ch1()->writable(),
1000);
EXPECT_TRUE(ep1_ch1()->best_connection() &&
ep2_ch1()->best_connection());
TestSendRecv(1);
}

View File

@@ -112,7 +112,7 @@ const int RTT_RATIO = 3; // 3 : 1
// The delay before we begin checking if this port is useless.
const int kPortTimeoutDelay = 30 * 1000; // 30 seconds
const uint32 MSG_CHECKTIMEOUT = 1;
// Used by the Connection.
const uint32 MSG_DELETE = 1;
}
@@ -181,7 +181,8 @@ Port::Port(talk_base::Thread* thread, talk_base::Network* network,
ice_protocol_(ICEPROTO_GOOGLE),
ice_role_(ICEROLE_UNKNOWN),
tiebreaker_(0),
shared_socket_(true) {
shared_socket_(true),
default_dscp_(talk_base::DSCP_NO_CHANGE) {
Construct();
}
@@ -207,7 +208,8 @@ Port::Port(talk_base::Thread* thread, const std::string& type,
ice_protocol_(ICEPROTO_GOOGLE),
ice_role_(ICEROLE_UNKNOWN),
tiebreaker_(0),
shared_socket_(false) {
shared_socket_(false),
default_dscp_(talk_base::DSCP_NO_CHANGE) {
ASSERT(factory_ != NULL);
Construct();
}
@@ -606,7 +608,7 @@ void Port::SendBindingResponse(StunMessage* request,
// Send the response message.
talk_base::ByteBuffer buf;
response.Write(&buf);
if (SendTo(buf.Data(), buf.Length(), addr, false) < 0) {
if (SendTo(buf.Data(), buf.Length(), addr, DefaultDscpValue(), false) < 0) {
LOG_J(LS_ERROR, this) << "Failed to send STUN ping response to "
<< addr.ToSensitiveString();
}
@@ -660,7 +662,7 @@ void Port::SendBindingErrorResponse(StunMessage* request,
// Send the response message.
talk_base::ByteBuffer buf;
response.Write(&buf);
SendTo(buf.Data(), buf.Length(), addr, false);
SendTo(buf.Data(), buf.Length(), addr, DefaultDscpValue(), false);
LOG_J(LS_INFO, this) << "Sending STUN binding error: reason=" << reason
<< " to " << addr.ToSensitiveString();
}
@@ -916,7 +918,8 @@ void Connection::set_use_candidate_attr(bool enable) {
void Connection::OnSendStunPacket(const void* data, size_t size,
StunRequest* req) {
if (port_->SendTo(data, size, remote_candidate_.address(), false) < 0) {
if (port_->SendTo(data, size, remote_candidate_.address(),
port_->DefaultDscpValue(), false) < 0) {
LOG_J(LS_WARNING, this) << "Failed to send STUN ping " << req->id();
}
}
@@ -1389,12 +1392,13 @@ ProxyConnection::ProxyConnection(Port* port, size_t index,
: Connection(port, index, candidate), error_(0) {
}
int ProxyConnection::Send(const void* data, size_t size) {
int ProxyConnection::Send(const void* data, size_t size,
talk_base::DiffServCodePoint dscp) {
if (write_state_ == STATE_WRITE_INIT || write_state_ == STATE_WRITE_TIMEOUT) {
error_ = EWOULDBLOCK;
return SOCKET_ERROR;
}
int sent = port_->SendTo(data, size, remote_candidate_.address(), true);
int sent = port_->SendTo(data, size, remote_candidate_.address(), dscp, true);
if (sent <= 0) {
ASSERT(sent < 0);
error_ = port_->GetError();

View File

@@ -304,7 +304,17 @@ class Port : public PortInterface, public talk_base::MessageHandler,
// Returns if Google ICE protocol is used.
bool IsGoogleIce() const;
// Returns default DSCP value.
talk_base::DiffServCodePoint DefaultDscpValue() const {
return default_dscp_;
}
protected:
enum {
MSG_CHECKTIMEOUT = 0,
MSG_FIRST_AVAILABLE
};
void set_type(const std::string& type) { type_ = type; }
// Fills in the local address of the port.
void AddAddress(const talk_base::SocketAddress& address,
@@ -334,6 +344,11 @@ class Port : public PortInterface, public talk_base::MessageHandler,
// Checks if the address in addr is compatible with the port's ip.
bool IsCompatibleAddress(const talk_base::SocketAddress& addr);
// Default DSCP value for this port. Set by TransportChannel.
void SetDefaultDscpValue(talk_base::DiffServCodePoint dscp) {
default_dscp_ = dscp;
}
private:
void Construct();
// Called when one of our connections deletes itself.
@@ -372,7 +387,9 @@ class Port : public PortInterface, public talk_base::MessageHandler,
IceRole ice_role_;
uint64 tiebreaker_;
bool shared_socket_;
// DSCP value for ICE/STUN messages. Set by the P2PTransportChannel after
// port becomes ready.
talk_base::DiffServCodePoint default_dscp_;
// Information to use when going through a proxy.
std::string user_agent_;
talk_base::ProxyInfo proxy_;
@@ -447,7 +464,8 @@ class Connection : public talk_base::MessageHandler,
// The connection can send and receive packets asynchronously. This matches
// the interface of AsyncPacketSocket, which may use UDP or TCP under the
// covers.
virtual int Send(const void* data, size_t size) = 0;
virtual int Send(const void* data, size_t size,
talk_base::DiffServCodePoint dscp) = 0;
// Error if Send() returns < 0
virtual int GetError() = 0;
@@ -576,7 +594,8 @@ class ProxyConnection : public Connection {
public:
ProxyConnection(Port* port, size_t index, const Candidate& candidate);
virtual int Send(const void* data, size_t size);
virtual int Send(const void* data, size_t size,
talk_base::DiffServCodePoint dscp);
virtual int GetError() { return error_; }
private:

View File

@@ -172,7 +172,7 @@ class TestPort : public Port {
}
virtual int SendTo(
const void* data, size_t size, const talk_base::SocketAddress& addr,
bool payload) {
talk_base::DiffServCodePoint dscp, bool payload) {
if (!payload) {
IceMessage* msg = new IceMessage;
ByteBuffer* buf = new ByteBuffer(static_cast<const char*>(data), size);
@@ -787,10 +787,12 @@ class FakeAsyncPacketSocket : public AsyncPacketSocket {
}
// Send a packet.
virtual int Send(const void *pv, size_t cb) {
virtual int Send(const void *pv, size_t cb,
talk_base::DiffServCodePoint dscp) {
return static_cast<int>(cb);
}
virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr,
talk_base::DiffServCodePoint dscp) {
return static_cast<int>(cb);
}
virtual int Close() {
@@ -1224,6 +1226,26 @@ TEST_F(PortTest, TestSkipCrossFamilyUdp) {
TestCrossFamilyPorts(SOCK_DGRAM);
}
// This test verifies DSCP value set through SetOption interface can be
// get through DefaultDscpValue.
TEST_F(PortTest, TestDefaultDscpValue) {
talk_base::scoped_ptr<UDPPort> udpport(CreateUdpPort(kLocalAddr1));
udpport->SetOption(talk_base::Socket::OPT_DSCP, talk_base::DSCP_CS6);
EXPECT_EQ(talk_base::DSCP_CS6, udpport->DefaultDscpValue());
talk_base::scoped_ptr<TCPPort> tcpport(CreateTcpPort(kLocalAddr1));
tcpport->SetOption(talk_base::Socket::OPT_DSCP, talk_base::DSCP_AF31);
EXPECT_EQ(talk_base::DSCP_AF31, tcpport->DefaultDscpValue());
talk_base::scoped_ptr<StunPort> stunport(
CreateStunPort(kLocalAddr1, nat_socket_factory1()));
stunport->SetOption(talk_base::Socket::OPT_DSCP, talk_base::DSCP_AF41);
EXPECT_EQ(talk_base::DSCP_AF41, stunport->DefaultDscpValue());
talk_base::scoped_ptr<TurnPort> turnport(CreateTurnPort(
kLocalAddr1, nat_socket_factory1(), PROTO_UDP, PROTO_UDP));
turnport->SetOption(talk_base::Socket::OPT_DSCP, talk_base::DSCP_CS7);
EXPECT_EQ(talk_base::DSCP_CS7, turnport->DefaultDscpValue());
// TODO(mallinath) - Test DSCP through GetOption.
}
// Test sending STUN messages in GICE format.
TEST_F(PortTest, TestSendStunMessageAsGice) {
talk_base::scoped_ptr<TestPort> lport(
@@ -2127,14 +2149,16 @@ TEST_F(PortTest, TestWritableState) {
// Data should be unsendable until the connection is accepted.
char data[] = "abcd";
int data_size = ARRAY_SIZE(data);
EXPECT_EQ(SOCKET_ERROR, ch1.conn()->Send(data, data_size));
EXPECT_EQ(SOCKET_ERROR,
ch1.conn()->Send(data, data_size, talk_base::DSCP_NO_CHANGE));
// Accept the connection to return the binding response, transition to
// writable, and allow data to be sent.
ch2.AcceptConnection();
EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, ch1.conn()->write_state(),
kTimeout);
EXPECT_EQ(data_size, ch1.conn()->Send(data, data_size));
EXPECT_EQ(data_size,
ch1.conn()->Send(data, data_size, talk_base::DSCP_NO_CHANGE));
// Ask the connection to update state as if enough time has passed to lose
// full writability and 5 pings went unresponded to. We'll accomplish the
@@ -2147,7 +2171,8 @@ TEST_F(PortTest, TestWritableState) {
EXPECT_EQ(Connection::STATE_WRITE_UNRELIABLE, ch1.conn()->write_state());
// Data should be able to be sent in this state.
EXPECT_EQ(data_size, ch1.conn()->Send(data, data_size));
EXPECT_EQ(data_size,
ch1.conn()->Send(data, data_size, talk_base::DSCP_NO_CHANGE));
// And now allow the other side to process the pings and send binding
// responses.
@@ -2164,7 +2189,8 @@ TEST_F(PortTest, TestWritableState) {
EXPECT_EQ(Connection::STATE_WRITE_TIMEOUT, ch1.conn()->write_state());
// Now that the connection has completely timed out, data send should fail.
EXPECT_EQ(SOCKET_ERROR, ch1.conn()->Send(data, data_size));
EXPECT_EQ(SOCKET_ERROR,
ch1.conn()->Send(data, data_size, talk_base::DSCP_NO_CHANGE));
ch1.Stop();
ch2.Stop();

View File

@@ -30,6 +30,7 @@
#include <string>
#include "talk/base/dscp.h"
#include "talk/base/socketaddress.h"
#include "talk/p2p/base/transport.h"
@@ -90,16 +91,16 @@ class PortInterface {
// Functions on the underlying socket(s).
virtual int SetOption(talk_base::Socket::Option opt, int value) = 0;
virtual int GetError() = 0;
virtual int GetOption(talk_base::Socket::Option opt, int* value) = 0;
virtual int GetError() = 0;
virtual const std::vector<Candidate>& Candidates() const = 0;
// Sends the given packet to the given address, provided that the address is
// that of a connection or an address that has sent to us already.
virtual int SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr, bool payload) = 0;
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp, bool payload) = 0;
// Indicates that we received a successful STUN binding request from an
// address that doesn't correspond to any current connection. To turn this

View File

@@ -97,9 +97,10 @@ Connection* PortProxy::CreateConnection(const Candidate& remote_candidate,
int PortProxy::SendTo(const void* data,
size_t size,
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload) {
ASSERT(impl_ != NULL);
return impl_->SendTo(data, size, addr, payload);
return impl_->SendTo(data, size, addr, dscp, payload);
}
int PortProxy::SetOption(talk_base::Socket::Option opt,
@@ -114,7 +115,6 @@ int PortProxy::GetOption(talk_base::Socket::Option opt,
return impl_->GetOption(opt, value);
}
int PortProxy::GetError() {
ASSERT(impl_ != NULL);
return impl_->GetError();

View File

@@ -68,7 +68,9 @@ class PortProxy : public PortInterface, public sigslot::has_slots<> {
const talk_base::SocketAddress& remote_addr);
virtual int SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr, bool payload);
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload);
virtual int SetOption(talk_base::Socket::Option opt, int value);
virtual int GetOption(talk_base::Socket::Option opt, int* value);
virtual int GetError();

View File

@@ -74,14 +74,16 @@ RawTransportChannel::~RawTransportChannel() {
delete allocator_session_;
}
int RawTransportChannel::SendPacket(const char *data, size_t size, int flags) {
int RawTransportChannel::SendPacket(const char *data, size_t size,
talk_base::DiffServCodePoint dscp,
int flags) {
if (port_ == NULL)
return -1;
if (remote_address_.IsNil())
return -1;
if (flags != 0)
return -1;
return port_->SendTo(data, size, remote_address_, true);
return port_->SendTo(data, size, remote_address_, dscp, true);
}
int RawTransportChannel::SetOption(talk_base::Socket::Option opt, int value) {

View File

@@ -64,7 +64,8 @@ class RawTransportChannel : public TransportChannelImpl,
virtual ~RawTransportChannel();
// Implementation of normal channel packet sending.
virtual int SendPacket(const char *data, size_t len, int flags);
virtual int SendPacket(const char *data, size_t len,
talk_base::DiffServCodePoint dscp, int flags);
virtual int SetOption(talk_base::Socket::Option opt, int value);
virtual int GetError();

View File

@@ -67,7 +67,7 @@ class RelayConnection : public sigslot::has_slots<> {
bool CheckResponse(StunMessage* msg);
// Sends data to the relay server.
int Send(const void* pv, size_t cb);
int Send(const void* pv, size_t cb, talk_base::DiffServCodePoint dscp);
// Sends a STUN allocate request message to the relay server.
void SendAllocateRequest(RelayEntry* entry, int delay);
@@ -123,7 +123,8 @@ class RelayEntry : public talk_base::MessageHandler,
// Sends a packet to the given destination address using the socket of this
// entry. This will wrap the packet in STUN if necessary.
int SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr);
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp);
// Schedules a keep-alive allocate request.
void ScheduleKeepAlive();
@@ -163,7 +164,8 @@ class RelayEntry : public talk_base::MessageHandler,
// Sends the given data on the socket to the server with no wrapping. This
// returns the number of bytes written or -1 if an error occurred.
int SendPacket(const void* data, size_t size);
int SendPacket(const void* data, size_t size,
talk_base::DiffServCodePoint dscp);
};
// Handles an allocate request for a particular RelayEntry.
@@ -300,7 +302,9 @@ Connection* RelayPort::CreateConnection(const Candidate& address,
}
int RelayPort::SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr, bool payload) {
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload) {
// Try to find an entry for this specific address. Note that the first entry
// created was not given an address initially, so it can be set to the first
// address that comes along.
@@ -341,7 +345,7 @@ int RelayPort::SendTo(const void* data, size_t size,
}
// Send the actual contents to the server using the usual mechanism.
int sent = entry->SendTo(data, size, addr);
int sent = entry->SendTo(data, size, addr, dscp);
if (sent <= 0) {
ASSERT(sent < 0);
error_ = entry->GetError();
@@ -354,6 +358,14 @@ int RelayPort::SendTo(const void* data, size_t size,
int RelayPort::SetOption(talk_base::Socket::Option opt, int value) {
int result = 0;
// DSCP option is not passed to the socket.
// TODO(mallinath) - After we have the support on socket,
// remove this specialization.
if (opt == talk_base::Socket::OPT_DSCP) {
SetDefaultDscpValue(static_cast<talk_base::DiffServCodePoint>(value));
return result;
}
for (size_t i = 0; i < entries_.size(); ++i) {
if (entries_[i]->SetSocketOption(opt, value) < 0) {
result = -1;
@@ -418,7 +430,9 @@ bool RelayConnection::CheckResponse(StunMessage* msg) {
void RelayConnection::OnSendPacket(const void* data, size_t size,
StunRequest* req) {
int sent = socket_->SendTo(data, size, GetAddress());
// TODO(mallinath) Find a way to get DSCP value from Port.
int sent = socket_->SendTo(
data, size, GetAddress(), talk_base::DSCP_NO_CHANGE);
if (sent <= 0) {
LOG(LS_VERBOSE) << "OnSendPacket: failed sending to " << GetAddress() <<
std::strerror(socket_->GetError());
@@ -426,8 +440,9 @@ void RelayConnection::OnSendPacket(const void* data, size_t size,
}
}
int RelayConnection::Send(const void* pv, size_t cb) {
return socket_->SendTo(pv, cb, GetAddress());
int RelayConnection::Send(const void* pv, size_t cb,
talk_base::DiffServCodePoint dscp) {
return socket_->SendTo(pv, cb, GetAddress(), dscp);
}
void RelayConnection::SendAllocateRequest(RelayEntry* entry, int delay) {
@@ -546,11 +561,12 @@ void RelayEntry::OnConnect(const talk_base::SocketAddress& mapped_addr,
}
int RelayEntry::SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr) {
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp) {
// If this connection is locked to the address given, then we can send the
// packet with no wrapper.
if (locked_ && (ext_addr_ == addr))
return SendPacket(data, size);
return SendPacket(data, size, dscp);
// Otherwise, we must wrap the given data in a STUN SEND request so that we
// can communicate the destination address to the server.
@@ -598,7 +614,7 @@ int RelayEntry::SendTo(const void* data, size_t size,
talk_base::ByteBuffer buf;
request.Write(&buf);
return SendPacket(buf.Data(), buf.Length());
return SendPacket(buf.Data(), buf.Length(), dscp);
}
void RelayEntry::ScheduleKeepAlive() {
@@ -744,12 +760,13 @@ void RelayEntry::OnReadyToSend(talk_base::AsyncPacketSocket* socket) {
}
}
int RelayEntry::SendPacket(const void* data, size_t size) {
int RelayEntry::SendPacket(const void* data, size_t size,
talk_base::DiffServCodePoint dscp) {
int sent = 0;
if (current_connection_) {
// We are connected, no need to send packets anywere else than to
// the current connection.
sent = current_connection_->Send(data, size);
sent = current_connection_->Send(data, size, dscp);
}
return sent;
}

View File

@@ -92,7 +92,9 @@ class RelayPort : public Port {
void SetReady();
virtual int SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr, bool payload);
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload);
// Dispatches the given packet to the port or connection as appropriate.
void OnReadPacket(const char* data, size_t size,

View File

@@ -51,7 +51,7 @@ static const uint32 kMessageAcceptConnection = 1;
// Calls SendTo on the given socket and logs any bad results.
void Send(talk_base::AsyncPacketSocket* socket, const char* bytes, size_t size,
const talk_base::SocketAddress& addr) {
int result = socket->SendTo(bytes, size, addr);
int result = socket->SendTo(bytes, size, addr, talk_base::DSCP_NO_CHANGE);
if (result < static_cast<int>(size)) {
LOG(LS_ERROR) << "SendTo wrote only " << result << " of " << size
<< " bytes";

View File

@@ -32,6 +32,7 @@
#include "talk/base/base64.h"
#include "talk/base/common.h"
#include "talk/base/dscp.h"
#include "talk/base/gunit.h"
#include "talk/base/helpers.h"
#include "talk/base/logging.h"
@@ -830,7 +831,7 @@ struct ChannelHandler : sigslot::has_slots<> {
std::string data_with_id(name);
data_with_id += data;
int result = channel->SendPacket(data_with_id.c_str(), data_with_id.size(),
0);
talk_base::DSCP_NO_CHANGE, 0);
EXPECT_EQ(static_cast<int>(data_with_id.size()), result);
}

View File

@@ -216,8 +216,10 @@ Connection* UDPPort::CreateConnection(const Candidate& address,
}
int UDPPort::SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr, bool payload) {
int sent = socket_->SendTo(data, size, addr);
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload) {
int sent = socket_->SendTo(data, size, addr, dscp);
if (sent < 0) {
error_ = socket_->GetError();
LOG_J(LS_ERROR, this) << "UDP send of " << size
@@ -227,6 +229,12 @@ int UDPPort::SendTo(const void* data, size_t size,
}
int UDPPort::SetOption(talk_base::Socket::Option opt, int value) {
// TODO(mallinath) - After we have the support on socket,
// remove this specialization.
if (opt == talk_base::Socket::OPT_DSCP) {
SetDefaultDscpValue(static_cast<talk_base::DiffServCodePoint>(value));
return 0;
}
return socket_->SetOption(opt, value);
}
@@ -345,7 +353,7 @@ void UDPPort::SetResult(bool success) {
// TODO: merge this with SendTo above.
void UDPPort::OnSendPacket(const void* data, size_t size, StunRequest* req) {
StunBindingRequest* sreq = static_cast<StunBindingRequest*>(req);
if (socket_->SendTo(data, size, sreq->server_addr()) < 0)
if (socket_->SendTo(data, size, sreq->server_addr(), DefaultDscpValue()) < 0)
PLOG(LERROR, socket_->GetError()) << "sendto";
}

View File

@@ -121,7 +121,9 @@ class UDPPort : public Port {
bool Init();
virtual int SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr, bool payload);
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload);
void OnLocalAddressReady(talk_base::AsyncPacketSocket* socket,
const talk_base::SocketAddress& address);

View File

@@ -102,7 +102,8 @@ void StunServer::SendResponse(
const StunMessage& msg, const talk_base::SocketAddress& addr) {
talk_base::ByteBuffer buf;
msg.Write(&buf);
if (socket_->SendTo(buf.Data(), buf.Length(), addr) < 0)
if (socket_->SendTo(
buf.Data(), buf.Length(), addr, talk_base::DSCP_NO_CHANGE) < 0)
LOG_ERR(LS_ERROR) << "sendto";
}

View File

@@ -134,7 +134,9 @@ void TCPPort::PrepareAddress() {
}
int TCPPort::SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr, bool payload) {
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload) {
talk_base::AsyncPacketSocket * socket = NULL;
if (TCPConnection * conn = static_cast<TCPConnection*>(GetConnection(addr))) {
socket = conn->socket();
@@ -147,7 +149,7 @@ int TCPPort::SendTo(const void* data, size_t size,
return -1; // TODO: Set error_
}
int sent = socket->Send(data, size);
int sent = socket->Send(data, size, dscp);
if (sent < 0) {
error_ = socket->GetError();
LOG_J(LS_ERROR, this) << "TCP send of " << size
@@ -165,6 +167,14 @@ int TCPPort::GetOption(talk_base::Socket::Option opt, int* value) {
}
int TCPPort::SetOption(talk_base::Socket::Option opt, int value) {
// If we are setting DSCP value, pass value to base Port and return.
// TODO(mallinath) - After we have the support on socket,
// remove this specialization.
if (opt == talk_base::Socket::OPT_DSCP) {
SetDefaultDscpValue(static_cast<talk_base::DiffServCodePoint>(value));
return 0;
}
if (socket_) {
return socket_->SetOption(opt, value);
} else {
@@ -261,7 +271,8 @@ TCPConnection::~TCPConnection() {
delete socket_;
}
int TCPConnection::Send(const void* data, size_t size) {
int TCPConnection::Send(const void* data, size_t size,
talk_base::DiffServCodePoint dscp) {
if (!socket_) {
error_ = ENOTCONN;
return SOCKET_ERROR;
@@ -272,7 +283,7 @@ int TCPConnection::Send(const void* data, size_t size) {
error_ = EWOULDBLOCK;
return SOCKET_ERROR;
}
int sent = socket_->Send(data, size);
int sent = socket_->Send(data, size, dscp);
if (sent < 0) {
error_ = socket_->GetError();
} else {

View File

@@ -82,7 +82,9 @@ class TCPPort : public Port {
// Handles sending using the local TCP socket.
virtual int SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr, bool payload);
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload);
// Accepts incoming TCP connection.
void OnNewConnection(talk_base::AsyncPacketSocket* socket,
@@ -124,7 +126,8 @@ class TCPConnection : public Connection {
talk_base::AsyncPacketSocket* socket = 0);
virtual ~TCPConnection();
virtual int Send(const void* data, size_t size);
virtual int Send(const void* data, size_t size,
talk_base::DiffServCodePoint dscp);
virtual int GetError();
talk_base::AsyncPacketSocket* socket() { return socket_; }

View File

@@ -32,6 +32,7 @@
#include <vector>
#include "talk/base/basictypes.h"
#include "talk/base/dscp.h"
#include "talk/base/sigslot.h"
#include "talk/base/socket.h"
#include "talk/base/sslidentity.h"
@@ -80,7 +81,9 @@ class TransportChannel : public sigslot::has_slots<> {
// Attempts to send the given packet. The return value is < 0 on failure.
// TODO: Remove the default argument once channel code is updated.
virtual int SendPacket(const char* data, size_t len, int flags = 0) = 0;
virtual int SendPacket(const char* data, size_t len,
talk_base::DiffServCodePoint dscp,
int flags = 0) = 0;
// Sets a socket option on this channel. Note that not all options are
// supported by all transport types.

View File

@@ -93,13 +93,15 @@ void TransportChannelProxy::SetImplementation(TransportChannelImpl* impl) {
worker_thread_->Post(this, MSG_UPDATESTATE);
}
int TransportChannelProxy::SendPacket(const char* data, size_t len, int flags) {
int TransportChannelProxy::SendPacket(const char* data, size_t len,
talk_base::DiffServCodePoint dscp,
int flags) {
ASSERT(talk_base::Thread::Current() == worker_thread_);
// Fail if we don't have an impl yet.
if (!impl_) {
return -1;
}
return impl_->SendPacket(data, len, flags);
return impl_->SendPacket(data, len, dscp, flags);
}
int TransportChannelProxy::SetOption(talk_base::Socket::Option opt, int value) {

View File

@@ -63,7 +63,9 @@ class TransportChannelProxy : public TransportChannel,
// Implementation of the TransportChannel interface. These simply forward to
// the implementation.
virtual int SendPacket(const char* data, size_t len, int flags);
virtual int SendPacket(const char* data, size_t len,
talk_base::DiffServCodePoint dscp,
int flags);
virtual int SetOption(talk_base::Socket::Option opt, int value);
virtual int GetError();
virtual IceRole GetIceRole() const;

View File

@@ -52,10 +52,6 @@ static const int TURN_PERMISSION_TIMEOUT = 5 * 60 * 1000; // 5 minutes
static const size_t TURN_CHANNEL_HEADER_SIZE = 4U;
enum {
MSG_PORT_ERROR = 1
};
inline bool IsTurnChannelData(uint16 msg_type) {
return ((msg_type & 0xC000) == 0x4000); // MSB are 0b01
}
@@ -156,7 +152,8 @@ class TurnEntry : public sigslot::has_slots<> {
void SendChannelBindRequest(int delay);
// Sends a packet to the given destination address.
// This will wrap the packet in STUN if necessary.
int Send(const void* data, size_t size, bool payload);
int Send(const void* data, size_t size, bool payload,
talk_base::DiffServCodePoint dscp);
void OnCreatePermissionSuccess();
void OnCreatePermissionError(StunMessage* response, int code);
@@ -296,6 +293,14 @@ Connection* TurnPort::CreateConnection(const Candidate& address,
}
int TurnPort::SetOption(talk_base::Socket::Option opt, int value) {
// DSCP option is not passed to the socket.
// TODO(mallinath) - After we have the support on socket,
// remove this specialization.
if (opt == talk_base::Socket::OPT_DSCP) {
SetDefaultDscpValue(static_cast<talk_base::DiffServCodePoint>(value));
return 0;
}
if (!socket_) {
// If socket is not created yet, these options will be applied during socket
// creation.
@@ -318,6 +323,7 @@ int TurnPort::GetError() {
int TurnPort::SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload) {
// Try to find an entry for this specific address; we should have one.
TurnEntry* entry = FindEntry(addr);
@@ -332,7 +338,7 @@ int TurnPort::SendTo(const void* data, size_t size,
}
// Send the actual contents to the server using the usual mechanism.
int sent = entry->Send(data, size, payload);
int sent = entry->Send(data, size, payload, dscp);
if (sent <= 0) {
return SOCKET_ERROR;
}
@@ -406,7 +412,7 @@ void TurnPort::OnResolveResult(talk_base::SignalThread* signal_thread) {
void TurnPort::OnSendStunPacket(const void* data, size_t size,
StunRequest* request) {
if (Send(data, size) < 0) {
if (Send(data, size, DefaultDscpValue()) < 0) {
LOG_J(LS_ERROR, this) << "Failed to send TURN message, err="
<< socket_->GetError();
}
@@ -431,15 +437,16 @@ void TurnPort::OnAllocateError() {
// We will send SignalPortError asynchronously as this can be sent during
// port initialization. This way it will not be blocking other port
// creation.
thread()->Post(this, MSG_PORT_ERROR);
thread()->Post(this, MSG_ERROR);
}
void TurnPort::OnMessage(talk_base::Message* message) {
if (message->message_id == MSG_PORT_ERROR) {
if (message->message_id == MSG_ERROR) {
SignalPortError(this);
} else {
Port::OnMessage(message);
return;
}
Port::OnMessage(message);
}
void TurnPort::OnAllocateRequestTimeout() {
@@ -557,8 +564,9 @@ void TurnPort::AddRequestAuthInfo(StunMessage* msg) {
VERIFY(msg->AddMessageIntegrity(hash()));
}
int TurnPort::Send(const void* data, size_t len) {
return socket_->SendTo(data, len, server_address_.address);
int TurnPort::Send(const void* data, size_t len,
talk_base::DiffServCodePoint dscp) {
return socket_->SendTo(data, len, server_address_.address, dscp);
}
void TurnPort::UpdateHash() {
@@ -890,7 +898,8 @@ void TurnEntry::SendChannelBindRequest(int delay) {
port_, this, channel_id_, ext_addr_), delay);
}
int TurnEntry::Send(const void* data, size_t size, bool payload) {
int TurnEntry::Send(const void* data, size_t size, bool payload,
talk_base::DiffServCodePoint dscp) {
talk_base::ByteBuffer buf;
if (state_ != STATE_BOUND) {
// If we haven't bound the channel yet, we have to use a Send Indication.
@@ -915,7 +924,7 @@ int TurnEntry::Send(const void* data, size_t size, bool payload) {
buf.WriteUInt16(static_cast<uint16>(size));
buf.WriteBytes(reinterpret_cast<const char*>(data), size);
}
return port_->Send(buf.Data(), buf.Length());
return port_->Send(buf.Data(), buf.Length(), dscp);
}
void TurnEntry::OnCreatePermissionSuccess() {

View File

@@ -74,6 +74,7 @@ class TurnPort : public Port {
const Candidate& c, PortInterface::CandidateOrigin origin);
virtual int SendTo(const void* data, size_t size,
const talk_base::SocketAddress& addr,
talk_base::DiffServCodePoint dscp,
bool payload);
virtual int SetOption(talk_base::Socket::Option opt, int value);
virtual int GetOption(talk_base::Socket::Option opt, int* value);
@@ -106,6 +107,8 @@ class TurnPort : public Port {
const RelayCredentials& credentials);
private:
enum { MSG_ERROR = MSG_FIRST_AVAILABLE };
typedef std::list<TurnEntry*> EntryList;
typedef std::map<talk_base::Socket::Option, int> SocketOptionsMap;
@@ -138,7 +141,7 @@ class TurnPort : public Port {
bool ScheduleRefresh(int lifetime);
void SendRequest(StunRequest* request, int delay);
int Send(const void* data, size_t size);
int Send(const void* data, size_t size, talk_base::DiffServCodePoint dscp);
void UpdateHash();
bool UpdateNonce(StunMessage* response);

View File

@@ -27,6 +27,7 @@
#include "talk/base/asynctcpsocket.h"
#include "talk/base/buffer.h"
#include "talk/base/dscp.h"
#include "talk/base/firewallsocketserver.h"
#include "talk/base/logging.h"
#include "talk/base/gunit.h"
@@ -217,8 +218,8 @@ class TurnPortTest : public testing::Test,
for (size_t j = 0; j < i + 1; ++j) {
buf[j] = 0xFF - j;
}
conn1->Send(buf, i + 1);
conn2->Send(buf, i + 1);
conn1->Send(buf, i + 1, talk_base::DSCP_NO_CHANGE);
conn2->Send(buf, i + 1, talk_base::DSCP_NO_CHANGE);
main_->ProcessMessages(0);
}

View File

@@ -564,7 +564,8 @@ void TurnServer::SendStun(Connection* conn, StunMessage* msg) {
void TurnServer::Send(Connection* conn,
const talk_base::ByteBuffer& buf) {
conn->socket()->SendTo(buf.Data(), buf.Length(), conn->src());
conn->socket()->SendTo(buf.Data(), buf.Length(), conn->src(),
talk_base::DSCP_NO_CHANGE);
}
void TurnServer::OnAllocationDestroyed(Allocation* allocation) {
@@ -936,7 +937,7 @@ void TurnServer::Allocation::SendErrorResponse(const TurnMessage* req, int code,
void TurnServer::Allocation::SendExternal(const void* data, size_t size,
const talk_base::SocketAddress& peer) {
external_socket_->SendTo(data, size, peer);
external_socket_->SendTo(data, size, peer, talk_base::DSCP_NO_CHANGE);
}
void TurnServer::Allocation::OnMessage(talk_base::Message* msg) {