TurnPort should retry allocation with a new address on error STUN_ERROR_ALLOCATION_MISMATCH.

BUG=3570
R=juberti@webrtc.org, mallinath@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/20999004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@7070 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
jiayl@webrtc.org 2014-09-04 19:11:34 +00:00
parent 021e76fd39
commit 574f2f60fe
7 changed files with 161 additions and 24 deletions

View File

@ -155,6 +155,7 @@ class Port : public PortInterface, public rtc::MessageHandler,
uint64 IceTiebreaker() const { return tiebreaker_; }
virtual bool SharedSocket() const { return shared_socket_; }
void ResetSharedSocket() { shared_socket_ = false; }
// The thread on which this port performs its I/O.
rtc::Thread* thread() { return thread_; }

View File

@ -51,6 +51,10 @@ static const int TURN_PERMISSION_TIMEOUT = 5 * 60 * 1000; // 5 minutes
static const size_t TURN_CHANNEL_HEADER_SIZE = 4U;
// Retry at most twice (i.e. three different ALLOCATE requests) on
// STUN_ERROR_ALLOCATION_MISMATCH error per rfc5766.
static const size_t MAX_ALLOCATE_MISMATCH_RETRIES = 2;
inline bool IsTurnChannelData(uint16 msg_type) {
return ((msg_type & 0xC000) == 0x4000); // MSB are 0b01
}
@ -188,7 +192,8 @@ TurnPort::TurnPort(rtc::Thread* thread,
request_manager_(thread),
next_channel_number_(TURN_CHANNEL_NUMBER_START),
connected_(false),
server_priority_(server_priority) {
server_priority_(server_priority),
allocate_mismatch_retries_(0) {
request_manager_.SignalSendPacket.connect(this, &TurnPort::OnSendStunPacket);
}
@ -212,7 +217,8 @@ TurnPort::TurnPort(rtc::Thread* thread,
request_manager_(thread),
next_channel_number_(TURN_CHANNEL_NUMBER_START),
connected_(false),
server_priority_(server_priority) {
server_priority_(server_priority),
allocate_mismatch_retries_(0) {
request_manager_.SignalSendPacket.connect(this, &TurnPort::OnSendStunPacket);
}
@ -271,6 +277,8 @@ void TurnPort::PrepareAddress() {
}
bool TurnPort::CreateTurnClientSocket() {
ASSERT(!socket_ || SharedSocket());
if (server_address_.proto == PROTO_UDP && !SharedSocket()) {
socket_ = socket_factory()->CreateUdpSocket(
rtc::SocketAddress(ip(), 0), min_port(), max_port());
@ -340,6 +348,29 @@ void TurnPort::OnSocketClose(rtc::AsyncPacketSocket* socket, int error) {
}
}
void TurnPort::OnAllocateMismatch() {
if (allocate_mismatch_retries_ >= MAX_ALLOCATE_MISMATCH_RETRIES) {
LOG_J(LS_WARNING, this) << "Giving up on the port after "
<< allocate_mismatch_retries_
<< " retries for STUN_ERROR_ALLOCATION_MISMATCH";
OnAllocateError();
return;
}
LOG_J(LS_INFO, this) << "Allocating a new socket after "
<< "STUN_ERROR_ALLOCATION_MISMATCH, retry = "
<< allocate_mismatch_retries_ + 1;
if (SharedSocket()) {
ResetSharedSocket();
} else {
delete socket_;
}
socket_ = NULL;
PrepareAddress();
++allocate_mismatch_retries_;
}
Connection* TurnPort::CreateConnection(const Candidate& address,
CandidateOrigin origin) {
// TURN-UDP can only connect to UDP candidates.
@ -580,6 +611,9 @@ void TurnPort::OnMessage(rtc::Message* message) {
if (message->message_id == MSG_ERROR) {
SignalPortError(this);
return;
} else if (message->message_id == MSG_ALLOCATE_MISMATCH) {
OnAllocateMismatch();
return;
}
Port::OnMessage(message);
@ -844,6 +878,11 @@ void TurnAllocateRequest::OnErrorResponse(StunMessage* response) {
case STUN_ERROR_TRY_ALTERNATE:
OnTryAlternate(response, error_code->code());
break;
case STUN_ERROR_ALLOCATION_MISMATCH:
// We must handle this error async because trying to delete the socket in
// OnErrorResponse will cause a deadlock on the socket.
port_->thread()->Post(port_, TurnPort::MSG_ALLOCATE_MISMATCH);
break;
default:
LOG_J(LS_WARNING, port_) << "Allocate response error, code="
<< error_code->code();
@ -966,7 +1005,6 @@ void TurnRefreshRequest::OnResponse(StunMessage* response) {
}
void TurnRefreshRequest::OnErrorResponse(StunMessage* response) {
// TODO(juberti): Handle 437 error response as a success.
const StunErrorCodeAttribute* error_code = response->GetErrorCode();
LOG_J(LS_WARNING, port_) << "Refresh response error, code="
<< error_code->code();

View File

@ -120,6 +120,12 @@ class TurnPort : public Port {
int error() const { return error_; }
void OnAllocateMismatch();
rtc::AsyncPacketSocket* socket() const {
return socket_;
}
// Signal with resolved server address.
// Parameters are port, server address and resolved server address.
// This signal will be sent only if server address is resolved successfully.
@ -154,7 +160,10 @@ class TurnPort : public Port {
int server_priority);
private:
enum { MSG_ERROR = MSG_FIRST_AVAILABLE };
enum {
MSG_ERROR = MSG_FIRST_AVAILABLE,
MSG_ALLOCATE_MISMATCH
};
typedef std::list<TurnEntry*> EntryList;
typedef std::map<rtc::Socket::Option, int> SocketOptionsMap;
@ -230,6 +239,9 @@ class TurnPort : public Port {
// calculating the candidate priority.
int server_priority_;
// The number of retries made due to allocate mismatch error.
size_t allocate_mismatch_retries_;
friend class TurnEntry;
friend class TurnAllocateRequest;
friend class TurnRefreshRequest;

View File

@ -210,10 +210,13 @@ class TurnPortTest : public testing::Test,
const cricket::ProtocolAddress& server_address) {
ASSERT(server_address.proto == cricket::PROTO_UDP);
socket_.reset(socket_factory_.CreateUdpSocket(
rtc::SocketAddress(kLocalAddr1.ipaddr(), 0), 0, 0));
ASSERT_TRUE(socket_ != NULL);
socket_->SignalReadPacket.connect(this, &TurnPortTest::OnSocketReadPacket);
if (!socket_) {
socket_.reset(socket_factory_.CreateUdpSocket(
rtc::SocketAddress(kLocalAddr1.ipaddr(), 0), 0, 0));
ASSERT_TRUE(socket_ != NULL);
socket_->SignalReadPacket.connect(
this, &TurnPortTest::OnSocketReadPacket);
}
cricket::RelayCredentials credentials(username, password);
turn_port_.reset(cricket::TurnPort::Create(
@ -413,6 +416,80 @@ TEST_F(TurnPortTest, TestTurnAllocateBadPassword) {
ASSERT_EQ(0U, turn_port_->Candidates().size());
}
// Tests that a new local address is created after
// STUN_ERROR_ALLOCATION_MISMATCH.
TEST_F(TurnPortTest, TestTurnAllocateMismatch) {
// Do a normal allocation first.
CreateTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr);
turn_port_->PrepareAddress();
EXPECT_TRUE_WAIT(turn_ready_, kTimeout);
rtc::SocketAddress first_addr(turn_port_->socket()->GetLocalAddress());
// Forces the socket server to assign the same port.
ss_->SetNextPortForTesting(first_addr.port());
turn_ready_ = false;
CreateTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr);
turn_port_->PrepareAddress();
// Verifies that the new port has the same address.
EXPECT_EQ(first_addr, turn_port_->socket()->GetLocalAddress());
EXPECT_TRUE_WAIT(turn_ready_, kTimeout);
// Verifies that the new port has a different address now.
EXPECT_NE(first_addr, turn_port_->socket()->GetLocalAddress());
}
// Tests that a shared-socket-TurnPort creates its own socket after
// STUN_ERROR_ALLOCATION_MISMATCH.
TEST_F(TurnPortTest, TestSharedSocketAllocateMismatch) {
// Do a normal allocation first.
CreateSharedTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr);
turn_port_->PrepareAddress();
EXPECT_TRUE_WAIT(turn_ready_, kTimeout);
rtc::SocketAddress first_addr(turn_port_->socket()->GetLocalAddress());
turn_ready_ = false;
CreateSharedTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr);
// Verifies that the new port has the same address.
EXPECT_EQ(first_addr, turn_port_->socket()->GetLocalAddress());
EXPECT_TRUE(turn_port_->SharedSocket());
turn_port_->PrepareAddress();
EXPECT_TRUE_WAIT(turn_ready_, kTimeout);
// Verifies that the new port has a different address now.
EXPECT_NE(first_addr, turn_port_->socket()->GetLocalAddress());
EXPECT_FALSE(turn_port_->SharedSocket());
}
TEST_F(TurnPortTest, TestTurnTcpAllocateMismatch) {
turn_server_.AddInternalSocket(kTurnTcpIntAddr, cricket::PROTO_TCP);
CreateTurnPort(kTurnUsername, kTurnPassword, kTurnTcpProtoAddr);
// Do a normal allocation first.
turn_port_->PrepareAddress();
EXPECT_TRUE_WAIT(turn_ready_, kTimeout);
rtc::SocketAddress first_addr(turn_port_->socket()->GetLocalAddress());
// Forces the socket server to assign the same port.
ss_->SetNextPortForTesting(first_addr.port());
turn_ready_ = false;
CreateTurnPort(kTurnUsername, kTurnPassword, kTurnTcpProtoAddr);
turn_port_->PrepareAddress();
// Verifies that the new port has the same address.
EXPECT_EQ(first_addr, turn_port_->socket()->GetLocalAddress());
EXPECT_TRUE_WAIT(turn_ready_, kTimeout);
// Verifies that the new port has a different address now.
EXPECT_NE(first_addr, turn_port_->socket()->GetLocalAddress());
}
// Do a TURN allocation and try to send a packet to it from the outside.
// The packet should be dropped. Then, try to send a packet from TURN to the
// outside. It should reach its destination. Finally, try again from the

View File

@ -62,9 +62,9 @@ inline bool IsTurnChannelData(uint16 msg_type) {
return ((msg_type & 0xC000) == 0x4000);
}
// IDs used for posted messages.
// IDs used for posted messages for TurnServer::Allocation.
enum {
MSG_TIMEOUT,
MSG_ALLOCATION_TIMEOUT,
};
// Encapsulates a TURN allocation.
@ -608,7 +608,9 @@ void TurnServer::DestroyInternalSocket(rtc::AsyncPacketSocket* socket) {
InternalSocketMap::iterator iter = server_sockets_.find(socket);
if (iter != server_sockets_.end()) {
rtc::AsyncPacketSocket* socket = iter->first;
delete socket;
// We must destroy the socket async to avoid invalidating the sigslot
// callback list iterator inside a sigslot callback.
rtc::Thread::Current()->Dispose(socket);
server_sockets_.erase(iter);
}
}
@ -662,7 +664,7 @@ TurnServer::Allocation::~Allocation() {
it != perms_.end(); ++it) {
delete *it;
}
thread_->Clear(this, MSG_TIMEOUT);
thread_->Clear(this, MSG_ALLOCATION_TIMEOUT);
LOG_J(LS_INFO, this) << "Allocation destroyed";
}
@ -707,7 +709,7 @@ void TurnServer::Allocation::HandleAllocateRequest(const TurnMessage* msg) {
// Figure out the lifetime and start the allocation timer.
int lifetime_secs = ComputeLifetime(msg);
thread_->PostDelayed(lifetime_secs * 1000, this, MSG_TIMEOUT);
thread_->PostDelayed(lifetime_secs * 1000, this, MSG_ALLOCATION_TIMEOUT);
LOG_J(LS_INFO, this) << "Created allocation, lifetime=" << lifetime_secs;
@ -734,8 +736,8 @@ void TurnServer::Allocation::HandleRefreshRequest(const TurnMessage* msg) {
int lifetime_secs = ComputeLifetime(msg);
// Reset the expiration timer.
thread_->Clear(this, MSG_TIMEOUT);
thread_->PostDelayed(lifetime_secs * 1000, this, MSG_TIMEOUT);
thread_->Clear(this, MSG_ALLOCATION_TIMEOUT);
thread_->PostDelayed(lifetime_secs * 1000, this, MSG_ALLOCATION_TIMEOUT);
LOG_J(LS_INFO, this) << "Refreshed allocation, lifetime=" << lifetime_secs;
@ -963,7 +965,7 @@ void TurnServer::Allocation::SendExternal(const void* data, size_t size,
}
void TurnServer::Allocation::OnMessage(rtc::Message* msg) {
ASSERT(msg->message_id == MSG_TIMEOUT);
ASSERT(msg->message_id == MSG_ALLOCATION_TIMEOUT);
SignalDestroyed(this);
delete this;
}
@ -988,16 +990,16 @@ TurnServer::Permission::Permission(rtc::Thread* thread,
}
TurnServer::Permission::~Permission() {
thread_->Clear(this, MSG_TIMEOUT);
thread_->Clear(this, MSG_ALLOCATION_TIMEOUT);
}
void TurnServer::Permission::Refresh() {
thread_->Clear(this, MSG_TIMEOUT);
thread_->PostDelayed(kPermissionTimeout, this, MSG_TIMEOUT);
thread_->Clear(this, MSG_ALLOCATION_TIMEOUT);
thread_->PostDelayed(kPermissionTimeout, this, MSG_ALLOCATION_TIMEOUT);
}
void TurnServer::Permission::OnMessage(rtc::Message* msg) {
ASSERT(msg->message_id == MSG_TIMEOUT);
ASSERT(msg->message_id == MSG_ALLOCATION_TIMEOUT);
SignalDestroyed(this);
delete this;
}
@ -1009,16 +1011,16 @@ TurnServer::Channel::Channel(rtc::Thread* thread, int id,
}
TurnServer::Channel::~Channel() {
thread_->Clear(this, MSG_TIMEOUT);
thread_->Clear(this, MSG_ALLOCATION_TIMEOUT);
}
void TurnServer::Channel::Refresh() {
thread_->Clear(this, MSG_TIMEOUT);
thread_->PostDelayed(kChannelTimeout, this, MSG_TIMEOUT);
thread_->Clear(this, MSG_ALLOCATION_TIMEOUT);
thread_->PostDelayed(kChannelTimeout, this, MSG_ALLOCATION_TIMEOUT);
}
void TurnServer::Channel::OnMessage(rtc::Message* msg) {
ASSERT(msg->message_id == MSG_TIMEOUT);
ASSERT(msg->message_id == MSG_ALLOCATION_TIMEOUT);
SignalDestroyed(this);
delete this;
}

View File

@ -639,6 +639,10 @@ bool VirtualSocketServer::ProcessMessagesUntilIdle() {
return !msg_queue_->IsQuitting();
}
void VirtualSocketServer::SetNextPortForTesting(uint16 port) {
next_port_ = port;
}
int VirtualSocketServer::Bind(VirtualSocket* socket,
const SocketAddress& addr) {
ASSERT(NULL != socket);

View File

@ -110,6 +110,9 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
// if Thread::Stop() was called.
bool ProcessMessagesUntilIdle();
// Sets the next port number to use for testing.
void SetNextPortForTesting(uint16 port);
protected:
// Returns a new IP not used before in this network.
IPAddress GetNextIP(int family);