From 574f2f60feaa41f4ca5d36381129066e6e8c25cb Mon Sep 17 00:00:00 2001 From: "jiayl@webrtc.org" Date: Thu, 4 Sep 2014 19:11:34 +0000 Subject: [PATCH] TurnPort should retry allocation with a new address on error STUN_ERROR_ALLOCATION_MISMATCH. BUG=3570 R=juberti@webrtc.org, mallinath@webrtc.org Review URL: https://webrtc-codereview.appspot.com/20999004 git-svn-id: http://webrtc.googlecode.com/svn/trunk@7070 4adac7df-926f-26a2-2b94-8c16560cd09d --- talk/p2p/base/port.h | 1 + talk/p2p/base/turnport.cc | 44 ++++++++++++++-- talk/p2p/base/turnport.h | 14 ++++- talk/p2p/base/turnport_unittest.cc | 85 ++++++++++++++++++++++++++++-- talk/p2p/base/turnserver.cc | 34 ++++++------ webrtc/base/virtualsocketserver.cc | 4 ++ webrtc/base/virtualsocketserver.h | 3 ++ 7 files changed, 161 insertions(+), 24 deletions(-) diff --git a/talk/p2p/base/port.h b/talk/p2p/base/port.h index cccfdada2..4893586d5 100644 --- a/talk/p2p/base/port.h +++ b/talk/p2p/base/port.h @@ -155,6 +155,7 @@ class Port : public PortInterface, public rtc::MessageHandler, uint64 IceTiebreaker() const { return tiebreaker_; } virtual bool SharedSocket() const { return shared_socket_; } + void ResetSharedSocket() { shared_socket_ = false; } // The thread on which this port performs its I/O. rtc::Thread* thread() { return thread_; } diff --git a/talk/p2p/base/turnport.cc b/talk/p2p/base/turnport.cc index 6ab0e2b96..2908d71c7 100644 --- a/talk/p2p/base/turnport.cc +++ b/talk/p2p/base/turnport.cc @@ -51,6 +51,10 @@ static const int TURN_PERMISSION_TIMEOUT = 5 * 60 * 1000; // 5 minutes static const size_t TURN_CHANNEL_HEADER_SIZE = 4U; +// Retry at most twice (i.e. three different ALLOCATE requests) on +// STUN_ERROR_ALLOCATION_MISMATCH error per rfc5766. +static const size_t MAX_ALLOCATE_MISMATCH_RETRIES = 2; + inline bool IsTurnChannelData(uint16 msg_type) { return ((msg_type & 0xC000) == 0x4000); // MSB are 0b01 } @@ -188,7 +192,8 @@ TurnPort::TurnPort(rtc::Thread* thread, request_manager_(thread), next_channel_number_(TURN_CHANNEL_NUMBER_START), connected_(false), - server_priority_(server_priority) { + server_priority_(server_priority), + allocate_mismatch_retries_(0) { request_manager_.SignalSendPacket.connect(this, &TurnPort::OnSendStunPacket); } @@ -212,7 +217,8 @@ TurnPort::TurnPort(rtc::Thread* thread, request_manager_(thread), next_channel_number_(TURN_CHANNEL_NUMBER_START), connected_(false), - server_priority_(server_priority) { + server_priority_(server_priority), + allocate_mismatch_retries_(0) { request_manager_.SignalSendPacket.connect(this, &TurnPort::OnSendStunPacket); } @@ -271,6 +277,8 @@ void TurnPort::PrepareAddress() { } bool TurnPort::CreateTurnClientSocket() { + ASSERT(!socket_ || SharedSocket()); + if (server_address_.proto == PROTO_UDP && !SharedSocket()) { socket_ = socket_factory()->CreateUdpSocket( rtc::SocketAddress(ip(), 0), min_port(), max_port()); @@ -340,6 +348,29 @@ void TurnPort::OnSocketClose(rtc::AsyncPacketSocket* socket, int error) { } } +void TurnPort::OnAllocateMismatch() { + if (allocate_mismatch_retries_ >= MAX_ALLOCATE_MISMATCH_RETRIES) { + LOG_J(LS_WARNING, this) << "Giving up on the port after " + << allocate_mismatch_retries_ + << " retries for STUN_ERROR_ALLOCATION_MISMATCH"; + OnAllocateError(); + return; + } + + LOG_J(LS_INFO, this) << "Allocating a new socket after " + << "STUN_ERROR_ALLOCATION_MISMATCH, retry = " + << allocate_mismatch_retries_ + 1; + if (SharedSocket()) { + ResetSharedSocket(); + } else { + delete socket_; + } + socket_ = NULL; + + PrepareAddress(); + ++allocate_mismatch_retries_; +} + Connection* TurnPort::CreateConnection(const Candidate& address, CandidateOrigin origin) { // TURN-UDP can only connect to UDP candidates. @@ -580,6 +611,9 @@ void TurnPort::OnMessage(rtc::Message* message) { if (message->message_id == MSG_ERROR) { SignalPortError(this); return; + } else if (message->message_id == MSG_ALLOCATE_MISMATCH) { + OnAllocateMismatch(); + return; } Port::OnMessage(message); @@ -844,6 +878,11 @@ void TurnAllocateRequest::OnErrorResponse(StunMessage* response) { case STUN_ERROR_TRY_ALTERNATE: OnTryAlternate(response, error_code->code()); break; + case STUN_ERROR_ALLOCATION_MISMATCH: + // We must handle this error async because trying to delete the socket in + // OnErrorResponse will cause a deadlock on the socket. + port_->thread()->Post(port_, TurnPort::MSG_ALLOCATE_MISMATCH); + break; default: LOG_J(LS_WARNING, port_) << "Allocate response error, code=" << error_code->code(); @@ -966,7 +1005,6 @@ void TurnRefreshRequest::OnResponse(StunMessage* response) { } void TurnRefreshRequest::OnErrorResponse(StunMessage* response) { - // TODO(juberti): Handle 437 error response as a success. const StunErrorCodeAttribute* error_code = response->GetErrorCode(); LOG_J(LS_WARNING, port_) << "Refresh response error, code=" << error_code->code(); diff --git a/talk/p2p/base/turnport.h b/talk/p2p/base/turnport.h index b9ec3b002..ab7d4e714 100644 --- a/talk/p2p/base/turnport.h +++ b/talk/p2p/base/turnport.h @@ -120,6 +120,12 @@ class TurnPort : public Port { int error() const { return error_; } + void OnAllocateMismatch(); + + rtc::AsyncPacketSocket* socket() const { + return socket_; + } + // Signal with resolved server address. // Parameters are port, server address and resolved server address. // This signal will be sent only if server address is resolved successfully. @@ -154,7 +160,10 @@ class TurnPort : public Port { int server_priority); private: - enum { MSG_ERROR = MSG_FIRST_AVAILABLE }; + enum { + MSG_ERROR = MSG_FIRST_AVAILABLE, + MSG_ALLOCATE_MISMATCH + }; typedef std::list EntryList; typedef std::map SocketOptionsMap; @@ -230,6 +239,9 @@ class TurnPort : public Port { // calculating the candidate priority. int server_priority_; + // The number of retries made due to allocate mismatch error. + size_t allocate_mismatch_retries_; + friend class TurnEntry; friend class TurnAllocateRequest; friend class TurnRefreshRequest; diff --git a/talk/p2p/base/turnport_unittest.cc b/talk/p2p/base/turnport_unittest.cc index 14befa9d1..d895cbd4d 100644 --- a/talk/p2p/base/turnport_unittest.cc +++ b/talk/p2p/base/turnport_unittest.cc @@ -210,10 +210,13 @@ class TurnPortTest : public testing::Test, const cricket::ProtocolAddress& server_address) { ASSERT(server_address.proto == cricket::PROTO_UDP); - socket_.reset(socket_factory_.CreateUdpSocket( - rtc::SocketAddress(kLocalAddr1.ipaddr(), 0), 0, 0)); - ASSERT_TRUE(socket_ != NULL); - socket_->SignalReadPacket.connect(this, &TurnPortTest::OnSocketReadPacket); + if (!socket_) { + socket_.reset(socket_factory_.CreateUdpSocket( + rtc::SocketAddress(kLocalAddr1.ipaddr(), 0), 0, 0)); + ASSERT_TRUE(socket_ != NULL); + socket_->SignalReadPacket.connect( + this, &TurnPortTest::OnSocketReadPacket); + } cricket::RelayCredentials credentials(username, password); turn_port_.reset(cricket::TurnPort::Create( @@ -413,6 +416,80 @@ TEST_F(TurnPortTest, TestTurnAllocateBadPassword) { ASSERT_EQ(0U, turn_port_->Candidates().size()); } +// Tests that a new local address is created after +// STUN_ERROR_ALLOCATION_MISMATCH. +TEST_F(TurnPortTest, TestTurnAllocateMismatch) { + // Do a normal allocation first. + CreateTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr); + turn_port_->PrepareAddress(); + EXPECT_TRUE_WAIT(turn_ready_, kTimeout); + rtc::SocketAddress first_addr(turn_port_->socket()->GetLocalAddress()); + + // Forces the socket server to assign the same port. + ss_->SetNextPortForTesting(first_addr.port()); + + turn_ready_ = false; + CreateTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr); + turn_port_->PrepareAddress(); + + // Verifies that the new port has the same address. + EXPECT_EQ(first_addr, turn_port_->socket()->GetLocalAddress()); + + EXPECT_TRUE_WAIT(turn_ready_, kTimeout); + + // Verifies that the new port has a different address now. + EXPECT_NE(first_addr, turn_port_->socket()->GetLocalAddress()); +} + +// Tests that a shared-socket-TurnPort creates its own socket after +// STUN_ERROR_ALLOCATION_MISMATCH. +TEST_F(TurnPortTest, TestSharedSocketAllocateMismatch) { + // Do a normal allocation first. + CreateSharedTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr); + turn_port_->PrepareAddress(); + EXPECT_TRUE_WAIT(turn_ready_, kTimeout); + rtc::SocketAddress first_addr(turn_port_->socket()->GetLocalAddress()); + + turn_ready_ = false; + CreateSharedTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr); + + // Verifies that the new port has the same address. + EXPECT_EQ(first_addr, turn_port_->socket()->GetLocalAddress()); + EXPECT_TRUE(turn_port_->SharedSocket()); + + turn_port_->PrepareAddress(); + EXPECT_TRUE_WAIT(turn_ready_, kTimeout); + + // Verifies that the new port has a different address now. + EXPECT_NE(first_addr, turn_port_->socket()->GetLocalAddress()); + EXPECT_FALSE(turn_port_->SharedSocket()); +} + +TEST_F(TurnPortTest, TestTurnTcpAllocateMismatch) { + turn_server_.AddInternalSocket(kTurnTcpIntAddr, cricket::PROTO_TCP); + CreateTurnPort(kTurnUsername, kTurnPassword, kTurnTcpProtoAddr); + + // Do a normal allocation first. + turn_port_->PrepareAddress(); + EXPECT_TRUE_WAIT(turn_ready_, kTimeout); + rtc::SocketAddress first_addr(turn_port_->socket()->GetLocalAddress()); + + // Forces the socket server to assign the same port. + ss_->SetNextPortForTesting(first_addr.port()); + + turn_ready_ = false; + CreateTurnPort(kTurnUsername, kTurnPassword, kTurnTcpProtoAddr); + turn_port_->PrepareAddress(); + + // Verifies that the new port has the same address. + EXPECT_EQ(first_addr, turn_port_->socket()->GetLocalAddress()); + + EXPECT_TRUE_WAIT(turn_ready_, kTimeout); + + // Verifies that the new port has a different address now. + EXPECT_NE(first_addr, turn_port_->socket()->GetLocalAddress()); +} + // Do a TURN allocation and try to send a packet to it from the outside. // The packet should be dropped. Then, try to send a packet from TURN to the // outside. It should reach its destination. Finally, try again from the diff --git a/talk/p2p/base/turnserver.cc b/talk/p2p/base/turnserver.cc index 08c060de4..dbcbcd493 100644 --- a/talk/p2p/base/turnserver.cc +++ b/talk/p2p/base/turnserver.cc @@ -62,9 +62,9 @@ inline bool IsTurnChannelData(uint16 msg_type) { return ((msg_type & 0xC000) == 0x4000); } -// IDs used for posted messages. +// IDs used for posted messages for TurnServer::Allocation. enum { - MSG_TIMEOUT, + MSG_ALLOCATION_TIMEOUT, }; // Encapsulates a TURN allocation. @@ -608,7 +608,9 @@ void TurnServer::DestroyInternalSocket(rtc::AsyncPacketSocket* socket) { InternalSocketMap::iterator iter = server_sockets_.find(socket); if (iter != server_sockets_.end()) { rtc::AsyncPacketSocket* socket = iter->first; - delete socket; + // We must destroy the socket async to avoid invalidating the sigslot + // callback list iterator inside a sigslot callback. + rtc::Thread::Current()->Dispose(socket); server_sockets_.erase(iter); } } @@ -662,7 +664,7 @@ TurnServer::Allocation::~Allocation() { it != perms_.end(); ++it) { delete *it; } - thread_->Clear(this, MSG_TIMEOUT); + thread_->Clear(this, MSG_ALLOCATION_TIMEOUT); LOG_J(LS_INFO, this) << "Allocation destroyed"; } @@ -707,7 +709,7 @@ void TurnServer::Allocation::HandleAllocateRequest(const TurnMessage* msg) { // Figure out the lifetime and start the allocation timer. int lifetime_secs = ComputeLifetime(msg); - thread_->PostDelayed(lifetime_secs * 1000, this, MSG_TIMEOUT); + thread_->PostDelayed(lifetime_secs * 1000, this, MSG_ALLOCATION_TIMEOUT); LOG_J(LS_INFO, this) << "Created allocation, lifetime=" << lifetime_secs; @@ -734,8 +736,8 @@ void TurnServer::Allocation::HandleRefreshRequest(const TurnMessage* msg) { int lifetime_secs = ComputeLifetime(msg); // Reset the expiration timer. - thread_->Clear(this, MSG_TIMEOUT); - thread_->PostDelayed(lifetime_secs * 1000, this, MSG_TIMEOUT); + thread_->Clear(this, MSG_ALLOCATION_TIMEOUT); + thread_->PostDelayed(lifetime_secs * 1000, this, MSG_ALLOCATION_TIMEOUT); LOG_J(LS_INFO, this) << "Refreshed allocation, lifetime=" << lifetime_secs; @@ -963,7 +965,7 @@ void TurnServer::Allocation::SendExternal(const void* data, size_t size, } void TurnServer::Allocation::OnMessage(rtc::Message* msg) { - ASSERT(msg->message_id == MSG_TIMEOUT); + ASSERT(msg->message_id == MSG_ALLOCATION_TIMEOUT); SignalDestroyed(this); delete this; } @@ -988,16 +990,16 @@ TurnServer::Permission::Permission(rtc::Thread* thread, } TurnServer::Permission::~Permission() { - thread_->Clear(this, MSG_TIMEOUT); + thread_->Clear(this, MSG_ALLOCATION_TIMEOUT); } void TurnServer::Permission::Refresh() { - thread_->Clear(this, MSG_TIMEOUT); - thread_->PostDelayed(kPermissionTimeout, this, MSG_TIMEOUT); + thread_->Clear(this, MSG_ALLOCATION_TIMEOUT); + thread_->PostDelayed(kPermissionTimeout, this, MSG_ALLOCATION_TIMEOUT); } void TurnServer::Permission::OnMessage(rtc::Message* msg) { - ASSERT(msg->message_id == MSG_TIMEOUT); + ASSERT(msg->message_id == MSG_ALLOCATION_TIMEOUT); SignalDestroyed(this); delete this; } @@ -1009,16 +1011,16 @@ TurnServer::Channel::Channel(rtc::Thread* thread, int id, } TurnServer::Channel::~Channel() { - thread_->Clear(this, MSG_TIMEOUT); + thread_->Clear(this, MSG_ALLOCATION_TIMEOUT); } void TurnServer::Channel::Refresh() { - thread_->Clear(this, MSG_TIMEOUT); - thread_->PostDelayed(kChannelTimeout, this, MSG_TIMEOUT); + thread_->Clear(this, MSG_ALLOCATION_TIMEOUT); + thread_->PostDelayed(kChannelTimeout, this, MSG_ALLOCATION_TIMEOUT); } void TurnServer::Channel::OnMessage(rtc::Message* msg) { - ASSERT(msg->message_id == MSG_TIMEOUT); + ASSERT(msg->message_id == MSG_ALLOCATION_TIMEOUT); SignalDestroyed(this); delete this; } diff --git a/webrtc/base/virtualsocketserver.cc b/webrtc/base/virtualsocketserver.cc index f8e8ddeb8..90d19e4aa 100644 --- a/webrtc/base/virtualsocketserver.cc +++ b/webrtc/base/virtualsocketserver.cc @@ -639,6 +639,10 @@ bool VirtualSocketServer::ProcessMessagesUntilIdle() { return !msg_queue_->IsQuitting(); } +void VirtualSocketServer::SetNextPortForTesting(uint16 port) { + next_port_ = port; +} + int VirtualSocketServer::Bind(VirtualSocket* socket, const SocketAddress& addr) { ASSERT(NULL != socket); diff --git a/webrtc/base/virtualsocketserver.h b/webrtc/base/virtualsocketserver.h index 87e35364c..0ea51ab5d 100644 --- a/webrtc/base/virtualsocketserver.h +++ b/webrtc/base/virtualsocketserver.h @@ -110,6 +110,9 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> { // if Thread::Stop() was called. bool ProcessMessagesUntilIdle(); + // Sets the next port number to use for testing. + void SetNextPortForTesting(uint16 port); + protected: // Returns a new IP not used before in this network. IPAddress GetNextIP(int family);