Implement Tcp Reconnect for TCPPort.
UDP case should not be changed. Active TCPConnection will initiate Reconnect after OnClose and when Send or Ping fails. Passive TCPConnection will prune itself as usual as the active side will create a new connection. The Reconnect could make P2PCT choose a different best_connection in the case where connectivities exist b/w more than 1 Network. Also, to avoid upper layer triggers ice restart, the WRITE_TIMEOUT caused by the socket disconnection is delayed to give the reconnect mechanism chance to kick in. The timeout event is only fired if the reconnect can't work in 5 sec. If the reconnect, there should be no ICE disconnected state trigger either in active or passive side. BUG=1926 R=pthatcher@webrtc.org Review URL: https://webrtc-codereview.appspot.com/31359004 Cr-Commit-Position: refs/heads/master@{#8929}
This commit is contained in:
parent
ef88309a6e
commit
be508a1d36
@ -249,6 +249,8 @@ TEST(NatTest, TestPhysicalIPv6) {
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class TestVirtualSocketServer : public VirtualSocketServer {
|
||||
public:
|
||||
explicit TestVirtualSocketServer(SocketServer* ss)
|
||||
@ -261,6 +263,8 @@ class TestVirtualSocketServer : public VirtualSocketServer {
|
||||
scoped_ptr<SocketServer> ss_;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
void TestVirtualInternal(int family) {
|
||||
scoped_ptr<TestVirtualSocketServer> int_vss(new TestVirtualSocketServer(
|
||||
new PhysicalSocketServer()));
|
||||
|
@ -606,6 +606,22 @@ void VirtualSocketServer::SetNextPortForTesting(uint16 port) {
|
||||
next_port_ = port;
|
||||
}
|
||||
|
||||
bool VirtualSocketServer::CloseTcpConnections(
|
||||
const SocketAddress& addr_local,
|
||||
const SocketAddress& addr_remote) {
|
||||
VirtualSocket* socket = LookupConnection(addr_local, addr_remote);
|
||||
if (!socket) {
|
||||
return false;
|
||||
}
|
||||
// Signal the close event on the local connection first.
|
||||
socket->SignalCloseEvent(socket, 0);
|
||||
|
||||
// Trigger the remote connection's close event.
|
||||
socket->Close();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int VirtualSocketServer::Bind(VirtualSocket* socket,
|
||||
const SocketAddress& addr) {
|
||||
ASSERT(NULL != socket);
|
||||
|
@ -114,6 +114,11 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
|
||||
// Sets the next port number to use for testing.
|
||||
void SetNextPortForTesting(uint16 port);
|
||||
|
||||
// Close a pair of Tcp connections by addresses. Both connections will have
|
||||
// its own OnClose invoked.
|
||||
bool CloseTcpConnections(const SocketAddress& addr_local,
|
||||
const SocketAddress& addr_remote);
|
||||
|
||||
protected:
|
||||
// Returns a new IP not used before in this network.
|
||||
IPAddress GetNextIP(int family);
|
||||
|
@ -398,7 +398,8 @@ void DtlsTransportChannelWrapper::OnReadableState(TransportChannel* channel) {
|
||||
ASSERT(rtc::Thread::Current() == worker_thread_);
|
||||
ASSERT(channel == channel_);
|
||||
LOG_J(LS_VERBOSE, this)
|
||||
<< "DTLSTransportChannelWrapper: channel readable state changed.";
|
||||
<< "DTLSTransportChannelWrapper: channel readable state changed to "
|
||||
<< channel_->readable();
|
||||
|
||||
if (dtls_state_ == STATE_NONE || dtls_state_ == STATE_OPEN) {
|
||||
set_readable(channel_->readable());
|
||||
@ -410,7 +411,8 @@ void DtlsTransportChannelWrapper::OnWritableState(TransportChannel* channel) {
|
||||
ASSERT(rtc::Thread::Current() == worker_thread_);
|
||||
ASSERT(channel == channel_);
|
||||
LOG_J(LS_VERBOSE, this)
|
||||
<< "DTLSTransportChannelWrapper: channel writable state changed.";
|
||||
<< "DTLSTransportChannelWrapper: channel writable state changed to "
|
||||
<< channel_->writable();
|
||||
|
||||
switch (dtls_state_) {
|
||||
case STATE_NONE:
|
||||
|
@ -67,7 +67,8 @@ int CompareConnectionCandidates(cricket::Connection* a,
|
||||
(b->remote_candidate().generation() + b->port()->generation());
|
||||
}
|
||||
|
||||
// Compare two connections based on their writability and static preferences.
|
||||
// Compare two connections based on their connected state, writability and
|
||||
// static preferences.
|
||||
int CompareConnections(cricket::Connection *a, cricket::Connection *b) {
|
||||
// Sort based on write-state. Better states have lower values.
|
||||
if (a->write_state() < b->write_state())
|
||||
@ -75,6 +76,38 @@ int CompareConnections(cricket::Connection *a, cricket::Connection *b) {
|
||||
if (a->write_state() > b->write_state())
|
||||
return -1;
|
||||
|
||||
// WARNING: Some complexity here about TCP reconnecting.
|
||||
// When a TCP connection fails because of a TCP socket disconnecting, the
|
||||
// active side of the connection will attempt to reconnect for 5 seconds while
|
||||
// pretending to be writable (the connection is not set to the unwritable
|
||||
// state). On the passive side, the connection also remains writable even
|
||||
// though it is disconnected, and a new connection is created when the active
|
||||
// side connects. At that point, there are two TCP connections on the passive
|
||||
// side: 1. the old, disconnected one that is pretending to be writable, and
|
||||
// 2. the new, connected one that is maybe not yet writable. For purposes of
|
||||
// pruning, pinging, and selecting the best connection, we want to treat the
|
||||
// new connection as "better" than the old one. We could add a method called
|
||||
// something like Connection::ImReallyBadEvenThoughImWritable, but that is
|
||||
// equivalent to the existing Connection::connected(), which we already have.
|
||||
// So, in code throughout this file, we'll check whether the connection is
|
||||
// connected() or not, and if it is not, treat it as "worse" than a connected
|
||||
// one, even though it's writable. In the code below, we're doing so to make
|
||||
// sure we treat a new writable connection as better than an old disconnected
|
||||
// connection.
|
||||
|
||||
// In the case where we reconnect TCP connections, the original best
|
||||
// connection is disconnected without changing to WRITE_TIMEOUT. In this case,
|
||||
// the new connection, when it becomes writable, should have higher priority.
|
||||
if (a->write_state() == cricket::Connection::STATE_WRITABLE &&
|
||||
b->write_state() == cricket::Connection::STATE_WRITABLE) {
|
||||
if (a->connected() && !b->connected()) {
|
||||
return 1;
|
||||
}
|
||||
if (!a->connected() && b->connected()) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// Compare the candidate information.
|
||||
return CompareConnectionCandidates(a, b);
|
||||
}
|
||||
@ -992,16 +1025,20 @@ void P2PTransportChannel::SortConnections() {
|
||||
SwitchBestConnectionTo(top_connection);
|
||||
}
|
||||
|
||||
// We can prune any connection for which there is a writable connection on
|
||||
// the same network with better or equal priority. We leave those with
|
||||
// better priority just in case they become writable later (at which point,
|
||||
// we would prune out the current best connection). We leave connections on
|
||||
// other networks because they may not be using the same resources and they
|
||||
// may represent very distinct paths over which we can switch.
|
||||
// We can prune any connection for which there is a connected, writable
|
||||
// connection on the same network with better or equal priority. We leave
|
||||
// those with better priority just in case they become writable later (at
|
||||
// which point, we would prune out the current best connection). We leave
|
||||
// connections on other networks because they may not be using the same
|
||||
// resources and they may represent very distinct paths over which we can
|
||||
// switch. If the |primier| connection is not connected, we may be
|
||||
// reconnecting a TCP connection and temporarily do not prune connections in
|
||||
// this network. See the big comment in CompareConnections.
|
||||
std::set<rtc::Network*>::iterator network;
|
||||
for (network = networks.begin(); network != networks.end(); ++network) {
|
||||
Connection* primier = GetBestConnectionOnNetwork(*network);
|
||||
if (!primier || (primier->write_state() != Connection::STATE_WRITABLE))
|
||||
if (!primier || (primier->write_state() != Connection::STATE_WRITABLE) ||
|
||||
!primier->connected())
|
||||
continue;
|
||||
|
||||
for (uint32 i = 0; i < connections_.size(); ++i) {
|
||||
@ -1162,6 +1199,8 @@ void P2PTransportChannel::OnPing() {
|
||||
}
|
||||
|
||||
// Is the connection in a state for us to even consider pinging the other side?
|
||||
// We consider a connection pingable even if it's not connected because that's
|
||||
// how a TCP connection is kicked into reconnecting on the active side.
|
||||
bool P2PTransportChannel::IsPingable(Connection* conn) {
|
||||
const Candidate& remote = conn->remote_candidate();
|
||||
// We should never get this far with an empty remote ufrag.
|
||||
@ -1171,10 +1210,12 @@ bool P2PTransportChannel::IsPingable(Connection* conn) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// An unconnected connection cannot be written to at all, so pinging is out
|
||||
// of the question.
|
||||
if (!conn->connected())
|
||||
// An never connected connection cannot be written to at all, so pinging is
|
||||
// out of the question. However, if it has become WRITABLE, it is in the
|
||||
// reconnecting state so ping is needed.
|
||||
if (!conn->connected() && conn->write_state() != Connection::STATE_WRITABLE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (writable()) {
|
||||
// If we are writable, then we only want to ping connections that could be
|
||||
@ -1192,14 +1233,17 @@ bool P2PTransportChannel::IsPingable(Connection* conn) {
|
||||
}
|
||||
|
||||
// Returns the next pingable connection to ping. This will be the oldest
|
||||
// pingable connection unless we have a writable connection that is past the
|
||||
// maximum acceptable ping delay.
|
||||
// pingable connection unless we have a connected, writable connection that is
|
||||
// past the maximum acceptable ping delay. When reconnecting a TCP connection,
|
||||
// the best connection is disconnected, although still WRITABLE while
|
||||
// reconnecting. The newly created connection should be selected as the ping
|
||||
// target to become writable instead. See the big comment in CompareConnections.
|
||||
Connection* P2PTransportChannel::FindNextPingableConnection() {
|
||||
uint32 now = rtc::Time();
|
||||
if (best_connection_ &&
|
||||
if (best_connection_ && best_connection_->connected() &&
|
||||
(best_connection_->write_state() == Connection::STATE_WRITABLE) &&
|
||||
(best_connection_->last_ping_sent()
|
||||
+ MAX_CURRENT_WRITABLE_DELAY <= now)) {
|
||||
(best_connection_->last_ping_sent() + MAX_CURRENT_WRITABLE_DELAY <=
|
||||
now)) {
|
||||
return best_connection_;
|
||||
}
|
||||
|
||||
|
@ -95,9 +95,6 @@ const int RTT_RATIO = 3; // 3 : 1
|
||||
|
||||
// The delay before we begin checking if this port is useless.
|
||||
const int kPortTimeoutDelay = 30 * 1000; // 30 seconds
|
||||
|
||||
// Used by the Connection.
|
||||
const uint32 MSG_DELETE = 1;
|
||||
}
|
||||
|
||||
namespace cricket {
|
||||
@ -948,7 +945,8 @@ void Connection::set_connected(bool value) {
|
||||
bool old_value = connected_;
|
||||
connected_ = value;
|
||||
if (value != old_value) {
|
||||
LOG_J(LS_VERBOSE, this) << "set_connected";
|
||||
LOG_J(LS_VERBOSE, this) << "set_connected from: " << old_value << " to "
|
||||
<< value;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1178,7 +1176,6 @@ void Connection::UpdateState(uint32 now) {
|
||||
}
|
||||
|
||||
void Connection::Ping(uint32 now) {
|
||||
ASSERT(connected_);
|
||||
last_ping_sent_ = now;
|
||||
pings_since_last_response_.push_back(now);
|
||||
ConnectionRequest *req = new ConnectionRequest(this);
|
||||
@ -1372,7 +1369,6 @@ void Connection::MaybeUpdatePeerReflexiveCandidate(
|
||||
|
||||
void Connection::OnMessage(rtc::Message *pmsg) {
|
||||
ASSERT(pmsg->message_id == MSG_DELETE);
|
||||
|
||||
LOG_J(LS_INFO, this) << "Connection deleted due to read or write timeout";
|
||||
SignalDestroyed(this);
|
||||
delete this;
|
||||
|
@ -546,6 +546,8 @@ class Connection : public rtc::MessageHandler,
|
||||
void MaybeUpdatePeerReflexiveCandidate(const Candidate& new_candidate);
|
||||
|
||||
protected:
|
||||
enum { MSG_DELETE = 0, MSG_FIRST_AVAILABLE };
|
||||
|
||||
// Constructs a new connection to the given remote port.
|
||||
Connection(Port* port, size_t index, const Candidate& candidate);
|
||||
|
||||
@ -553,8 +555,8 @@ class Connection : public rtc::MessageHandler,
|
||||
void OnSendStunPacket(const void* data, size_t size, StunRequest* req);
|
||||
|
||||
// Callbacks from ConnectionRequest
|
||||
void OnConnectionRequestResponse(ConnectionRequest* req,
|
||||
StunMessage* response);
|
||||
virtual void OnConnectionRequestResponse(ConnectionRequest* req,
|
||||
StunMessage* response);
|
||||
void OnConnectionRequestErrorResponse(ConnectionRequest* req,
|
||||
StunMessage* response);
|
||||
void OnConnectionRequestTimeout(ConnectionRequest* req);
|
||||
|
@ -75,6 +75,8 @@ static const int STUN_ERROR_SERVER_ERROR_AS_GICE =
|
||||
static const int kTiebreaker1 = 11111;
|
||||
static const int kTiebreaker2 = 22222;
|
||||
|
||||
static const char* data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
|
||||
|
||||
static Candidate GetCandidate(Port* port) {
|
||||
assert(port->Candidates().size() == 1);
|
||||
return port->Candidates()[0];
|
||||
@ -230,6 +232,7 @@ class TestChannel : public sigslot::has_slots<> {
|
||||
conn_->set_use_candidate_attr(remote_ice_mode == ICEMODE_FULL);
|
||||
conn_->SignalStateChange.connect(
|
||||
this, &TestChannel::OnConnectionStateChange);
|
||||
conn_->SignalDestroyed.connect(this, &TestChannel::OnDestroyed);
|
||||
}
|
||||
void OnConnectionStateChange(Connection* conn) {
|
||||
if (conn->write_state() == Connection::STATE_WRITABLE) {
|
||||
@ -242,6 +245,7 @@ class TestChannel : public sigslot::has_slots<> {
|
||||
Candidate c = GetCandidate(dst_);
|
||||
c.set_address(remote_address_);
|
||||
conn_ = src_->CreateConnection(c, Port::ORIGIN_MESSAGE);
|
||||
conn_->SignalDestroyed.connect(this, &TestChannel::OnDestroyed);
|
||||
src_->SendBindingResponse(remote_request_.get(), remote_address_);
|
||||
remote_request_.reset();
|
||||
}
|
||||
@ -252,8 +256,9 @@ class TestChannel : public sigslot::has_slots<> {
|
||||
conn_->Ping(now);
|
||||
}
|
||||
void Stop() {
|
||||
conn_->SignalDestroyed.connect(this, &TestChannel::OnDestroyed);
|
||||
conn_->Destroy();
|
||||
if (conn_) {
|
||||
conn_->Destroy();
|
||||
}
|
||||
}
|
||||
|
||||
void OnPortComplete(Port* port) {
|
||||
@ -263,6 +268,11 @@ class TestChannel : public sigslot::has_slots<> {
|
||||
ice_mode_ = ice_mode;
|
||||
}
|
||||
|
||||
int SendData(const char* data, size_t len) {
|
||||
rtc::PacketOptions options;
|
||||
return conn_->Send(data, len, options);
|
||||
}
|
||||
|
||||
void OnUnknownAddress(PortInterface* port, const SocketAddress& addr,
|
||||
ProtocolType proto,
|
||||
IceMessage* msg, const std::string& rf,
|
||||
@ -295,7 +305,12 @@ class TestChannel : public sigslot::has_slots<> {
|
||||
|
||||
void OnDestroyed(Connection* conn) {
|
||||
ASSERT_EQ(conn_, conn);
|
||||
LOG(INFO) << "OnDestroy connection " << conn << " deleted";
|
||||
conn_ = NULL;
|
||||
// When the connection is destroyed, also clear these fields so future
|
||||
// connections are possible.
|
||||
remote_request_.reset();
|
||||
remote_address_.Clear();
|
||||
}
|
||||
|
||||
void OnSrcPortDestroyed(PortInterface* port) {
|
||||
@ -303,6 +318,8 @@ class TestChannel : public sigslot::has_slots<> {
|
||||
ASSERT_EQ(destroyed_src, port);
|
||||
}
|
||||
|
||||
Port* src_port() { return src_.get(); }
|
||||
|
||||
bool nominated() const { return nominated_; }
|
||||
|
||||
private:
|
||||
@ -333,9 +350,13 @@ class PortTest : public testing::Test, public sigslot::has_slots<> {
|
||||
nat_socket_factory2_(&nat_factory2_),
|
||||
stun_server_(TestStunServer::Create(main_, kStunAddr)),
|
||||
turn_server_(main_, kTurnUdpIntAddr, kTurnUdpExtAddr),
|
||||
relay_server_(main_, kRelayUdpIntAddr, kRelayUdpExtAddr,
|
||||
kRelayTcpIntAddr, kRelayTcpExtAddr,
|
||||
kRelaySslTcpIntAddr, kRelaySslTcpExtAddr),
|
||||
relay_server_(main_,
|
||||
kRelayUdpIntAddr,
|
||||
kRelayUdpExtAddr,
|
||||
kRelayTcpIntAddr,
|
||||
kRelayTcpExtAddr,
|
||||
kRelaySslTcpIntAddr,
|
||||
kRelaySslTcpExtAddr),
|
||||
username_(rtc::CreateRandomString(ICE_UFRAG_LENGTH)),
|
||||
password_(rtc::CreateRandomString(ICE_PWD_LENGTH)),
|
||||
ice_protocol_(cricket::ICEPROTO_GOOGLE),
|
||||
@ -405,7 +426,6 @@ class PortTest : public testing::Test, public sigslot::has_slots<> {
|
||||
TestConnectivity("ssltcp", port1, RelayName(rtype, proto), port2,
|
||||
rtype == RELAY_GTURN, false, true, true);
|
||||
}
|
||||
|
||||
// helpers for above functions
|
||||
UDPPort* CreateUdpPort(const SocketAddress& addr) {
|
||||
return CreateUdpPort(addr, &socket_factory_);
|
||||
@ -525,10 +545,117 @@ class PortTest : public testing::Test, public sigslot::has_slots<> {
|
||||
bool accept, bool same_addr1,
|
||||
bool same_addr2, bool possible);
|
||||
|
||||
// This connects the provided channels which have already started. |ch1|
|
||||
// should have its Connection created (either through CreateConnection() or
|
||||
// TCP reconnecting mechanism before entering this function.
|
||||
void ConnectStartedChannels(TestChannel* ch1, TestChannel* ch2) {
|
||||
ASSERT_TRUE(ch1->conn());
|
||||
EXPECT_TRUE_WAIT(ch1->conn()->connected(), kTimeout); // for TCP connect
|
||||
ch1->Ping();
|
||||
WAIT(!ch2->remote_address().IsNil(), kTimeout);
|
||||
|
||||
// Send a ping from dst to src.
|
||||
ch2->AcceptConnection();
|
||||
ch2->Ping();
|
||||
EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, ch2->conn()->write_state(),
|
||||
kTimeout);
|
||||
}
|
||||
|
||||
// This connects and disconnects the provided channels in the same sequence as
|
||||
// TestConnectivity with all options set to |true|. It does not delete either
|
||||
// channel.
|
||||
void ConnectAndDisconnectChannels(TestChannel* ch1, TestChannel* ch2);
|
||||
void StartConnectAndStopChannels(TestChannel* ch1, TestChannel* ch2) {
|
||||
// Acquire addresses.
|
||||
ch1->Start();
|
||||
ch2->Start();
|
||||
|
||||
ch1->CreateConnection();
|
||||
ConnectStartedChannels(ch1, ch2);
|
||||
|
||||
// Destroy the connections.
|
||||
ch1->Stop();
|
||||
ch2->Stop();
|
||||
}
|
||||
|
||||
// This disconnects both end's Connection and make sure ch2 ready for new
|
||||
// connection.
|
||||
void DisconnectTcpTestChannels(TestChannel* ch1, TestChannel* ch2) {
|
||||
ASSERT_TRUE(ss_->CloseTcpConnections(
|
||||
static_cast<TCPConnection*>(ch1->conn())->socket()->GetLocalAddress(),
|
||||
static_cast<TCPConnection*>(ch2->conn())->socket()->GetLocalAddress()));
|
||||
|
||||
// Wait for both OnClose are delivered.
|
||||
EXPECT_TRUE_WAIT(!ch1->conn()->connected(), kTimeout);
|
||||
EXPECT_TRUE_WAIT(!ch2->conn()->connected(), kTimeout);
|
||||
|
||||
// Destroy channel2 connection to get ready for new incoming TCPConnection.
|
||||
ch2->conn()->Destroy();
|
||||
EXPECT_TRUE_WAIT(ch2->conn() == NULL, kTimeout);
|
||||
}
|
||||
|
||||
void TestTcpReconnect(bool ping_after_disconnected,
|
||||
bool send_after_disconnected) {
|
||||
Port* port1 = CreateTcpPort(kLocalAddr1);
|
||||
Port* port2 = CreateTcpPort(kLocalAddr2);
|
||||
|
||||
port1->set_component(cricket::ICE_CANDIDATE_COMPONENT_DEFAULT);
|
||||
port2->set_component(cricket::ICE_CANDIDATE_COMPONENT_DEFAULT);
|
||||
|
||||
// Set up channels and ensure both ports will be deleted.
|
||||
TestChannel ch1(port1, port2);
|
||||
TestChannel ch2(port2, port1);
|
||||
EXPECT_EQ(0, ch1.complete_count());
|
||||
EXPECT_EQ(0, ch2.complete_count());
|
||||
|
||||
ch1.Start();
|
||||
ch2.Start();
|
||||
ASSERT_EQ_WAIT(1, ch1.complete_count(), kTimeout);
|
||||
ASSERT_EQ_WAIT(1, ch2.complete_count(), kTimeout);
|
||||
|
||||
// Initial connecting the channel, create connection on channel1.
|
||||
ch1.CreateConnection();
|
||||
ConnectStartedChannels(&ch1, &ch2);
|
||||
|
||||
// Shorten the timeout period.
|
||||
const int kTcpReconnectTimeout = kTimeout;
|
||||
static_cast<TCPConnection*>(ch1.conn())
|
||||
->set_reconnection_timeout(kTcpReconnectTimeout);
|
||||
static_cast<TCPConnection*>(ch2.conn())
|
||||
->set_reconnection_timeout(kTcpReconnectTimeout);
|
||||
|
||||
// Once connected, disconnect them.
|
||||
DisconnectTcpTestChannels(&ch1, &ch2);
|
||||
|
||||
if (send_after_disconnected || ping_after_disconnected) {
|
||||
if (send_after_disconnected) {
|
||||
// First SendData after disconnect should fail but will trigger
|
||||
// reconnect.
|
||||
EXPECT_EQ(-1, ch1.SendData(data, static_cast<int>(strlen(data))));
|
||||
}
|
||||
|
||||
if (ping_after_disconnected) {
|
||||
// Ping should trigger reconnect.
|
||||
ch1.Ping();
|
||||
}
|
||||
|
||||
// Wait for channel's outgoing TCPConnection connected.
|
||||
EXPECT_TRUE_WAIT(ch1.conn()->connected(), kTimeout);
|
||||
|
||||
// Verify that we could still connect channels.
|
||||
ConnectStartedChannels(&ch1, &ch2);
|
||||
} else {
|
||||
EXPECT_EQ(ch1.conn()->write_state(), Connection::STATE_WRITABLE);
|
||||
EXPECT_TRUE_WAIT(
|
||||
ch1.conn()->write_state() == Connection::STATE_WRITE_TIMEOUT,
|
||||
kTcpReconnectTimeout + kTimeout);
|
||||
}
|
||||
|
||||
// Tear down and ensure that goes smoothly.
|
||||
ch1.Stop();
|
||||
ch2.Stop();
|
||||
EXPECT_TRUE_WAIT(ch1.conn() == NULL, kTimeout);
|
||||
EXPECT_TRUE_WAIT(ch2.conn() == NULL, kTimeout);
|
||||
}
|
||||
|
||||
void SetIceProtocolType(cricket::IceProtocolType protocol) {
|
||||
ice_protocol_ = protocol;
|
||||
@ -740,29 +867,6 @@ void PortTest::TestConnectivity(const char* name1, Port* port1,
|
||||
EXPECT_TRUE_WAIT(ch2.conn() == NULL, kTimeout);
|
||||
}
|
||||
|
||||
void PortTest::ConnectAndDisconnectChannels(TestChannel* ch1,
|
||||
TestChannel* ch2) {
|
||||
// Acquire addresses.
|
||||
ch1->Start();
|
||||
ch2->Start();
|
||||
|
||||
// Send a ping from src to dst.
|
||||
ch1->CreateConnection();
|
||||
EXPECT_TRUE_WAIT(ch1->conn()->connected(), kTimeout); // for TCP connect
|
||||
ch1->Ping();
|
||||
WAIT(!ch2->remote_address().IsNil(), kTimeout);
|
||||
|
||||
// Send a ping from dst to src.
|
||||
ch2->AcceptConnection();
|
||||
ch2->Ping();
|
||||
EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, ch2->conn()->write_state(),
|
||||
kTimeout);
|
||||
|
||||
// Destroy the connections.
|
||||
ch1->Stop();
|
||||
ch2->Stop();
|
||||
}
|
||||
|
||||
class FakePacketSocketFactory : public rtc::PacketSocketFactory {
|
||||
public:
|
||||
FakePacketSocketFactory()
|
||||
@ -1039,6 +1143,18 @@ TEST_F(PortTest, TestTcpToTcp) {
|
||||
TestTcpToTcp();
|
||||
}
|
||||
|
||||
TEST_F(PortTest, TestTcpReconnectOnSendPacket) {
|
||||
TestTcpReconnect(false /* ping */, true /* send */);
|
||||
}
|
||||
|
||||
TEST_F(PortTest, TestTcpReconnectOnPing) {
|
||||
TestTcpReconnect(true /* ping */, false /* send */);
|
||||
}
|
||||
|
||||
TEST_F(PortTest, TestTcpReconnectTimeout) {
|
||||
TestTcpReconnect(false /* ping */, false /* send */);
|
||||
}
|
||||
|
||||
/* TODO: Enable these once testrelayserver can accept external TCP.
|
||||
TEST_F(PortTest, TestTcpToTcpRelay) {
|
||||
TestTcpToRelay(PROTO_TCP);
|
||||
@ -2466,7 +2582,7 @@ TEST_F(PortTest, TestControllingNoTimeout) {
|
||||
TestChannel ch2(port2, port1);
|
||||
|
||||
// Simulate a connection that succeeds, and then is destroyed.
|
||||
ConnectAndDisconnectChannels(&ch1, &ch2);
|
||||
StartConnectAndStopChannels(&ch1, &ch2);
|
||||
|
||||
// After the connection is destroyed, the port should not be destroyed.
|
||||
rtc::Thread::Current()->ProcessMessages(kTimeout);
|
||||
@ -2498,7 +2614,7 @@ TEST_F(PortTest, TestControlledTimeout) {
|
||||
TestChannel ch2(port2, port1);
|
||||
|
||||
// Simulate a connection that succeeds, and then is destroyed.
|
||||
ConnectAndDisconnectChannels(&ch1, &ch2);
|
||||
StartConnectAndStopChannels(&ch1, &ch2);
|
||||
|
||||
// The controlled port should be destroyed after 10 milliseconds.
|
||||
EXPECT_TRUE_WAIT(destroyed(), kTimeout);
|
||||
|
@ -8,6 +8,62 @@
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This is a diagram of how TCP reconnect works for the active side. The
|
||||
* passive side just waits for an incoming connection.
|
||||
*
|
||||
* - Connected: Indicate whether the TCP socket is connected.
|
||||
*
|
||||
* - Writable: Whether the stun binding is completed. Sending a data packet
|
||||
* before stun binding completed will trigger IPC socket layer to shutdown
|
||||
* the connection.
|
||||
*
|
||||
* - PendingTCP: |connection_pending_| indicates whether there is an
|
||||
* outstanding TCP connection in progress.
|
||||
*
|
||||
* - PretendWri: Tracked by |pretending_to_be_writable_|. Marking connection as
|
||||
* WRITE_TIMEOUT will cause the connection be deleted. Instead, we're
|
||||
* "pretending" we're still writable for a period of time such that reconnect
|
||||
* could work.
|
||||
*
|
||||
* Data could only be sent in state 3. Sening data during state 2 & 6 will get
|
||||
* EWOULDBLOCK, 4 & 5 EPIPE.
|
||||
*
|
||||
* 7 -------------+
|
||||
* |Connected: N |
|
||||
* Timeout |Writable: N | Timeout
|
||||
* +------------------->|Connection is |<----------------+
|
||||
* | |Dead | |
|
||||
* | +--------------+ |
|
||||
* | ^ |
|
||||
* | OnClose | |
|
||||
* | +-----------------------+ | |
|
||||
* | | | |Timeout |
|
||||
* | v | | |
|
||||
* 4 +----------+ 5 -----+--+--+ 6 -----+-----+
|
||||
* |Connected: N|Send() or |Connected: N| |Connected: Y|
|
||||
* |Writable: Y|Ping() |Writable: Y|OnConnect |Writable: Y|
|
||||
* |PendingTCP:N+--------> |PendingTCP:Y+---------> |PendingTCP:N|
|
||||
* |PretendWri:Y| |PretendWri:Y| |PretendWri:Y|
|
||||
* +-----+------+ +------------+ +---+--+-----+
|
||||
* ^ ^ | |
|
||||
* | | OnClose | |
|
||||
* | +----------------------------------------------+ |
|
||||
* | |
|
||||
* | Stun Binding Completed |
|
||||
* | |
|
||||
* | OnClose |
|
||||
* +------------------------------------------------+ |
|
||||
* | v
|
||||
* 1 -----------+ 2 -----------+Stun 3 -----------+
|
||||
* |Connected: N| |Connected: Y|Binding |Connected: Y|
|
||||
* |Writable: N|OnConnect |Writable: N|Completed |Writable: Y|
|
||||
* |PendingTCP:Y+---------> |PendingTCP:N+--------> |PendingTCP:N|
|
||||
* |PretendWri:N| |PretendWri:N| |PretendWri:N|
|
||||
* +------------+ +------------+ +------------+
|
||||
*
|
||||
*/
|
||||
|
||||
#include "webrtc/p2p/base/tcpport.h"
|
||||
|
||||
#include "webrtc/p2p/base/common.h"
|
||||
@ -134,7 +190,16 @@ int TCPPort::SendTo(const void* data, size_t size,
|
||||
const rtc::PacketOptions& options,
|
||||
bool payload) {
|
||||
rtc::AsyncPacketSocket * socket = NULL;
|
||||
if (TCPConnection * conn = static_cast<TCPConnection*>(GetConnection(addr))) {
|
||||
TCPConnection* conn = static_cast<TCPConnection*>(GetConnection(addr));
|
||||
|
||||
// For Connection, this is the code path used by Ping() to establish
|
||||
// WRITABLE. It has to send through the socket directly as TCPConnection::Send
|
||||
// checks writability.
|
||||
if (conn) {
|
||||
if (!conn->connected()) {
|
||||
conn->MaybeReconnect();
|
||||
return SOCKET_ERROR;
|
||||
}
|
||||
socket = conn->socket();
|
||||
} else {
|
||||
socket = GetIncoming(addr);
|
||||
@ -142,12 +207,15 @@ int TCPPort::SendTo(const void* data, size_t size,
|
||||
if (!socket) {
|
||||
LOG_J(LS_ERROR, this) << "Attempted to send to an unknown destination, "
|
||||
<< addr.ToSensitiveString();
|
||||
return -1; // TODO: Set error_
|
||||
return SOCKET_ERROR; // TODO(tbd): Set error_
|
||||
}
|
||||
|
||||
int sent = socket->Send(data, size, options);
|
||||
if (sent < 0) {
|
||||
error_ = socket->GetError();
|
||||
// Error from this code path for a Connection (instead of from a bare
|
||||
// socket) will not trigger reconnecting. In theory, this shouldn't matter
|
||||
// as OnClose should always be called and set connected to false.
|
||||
LOG_J(LS_ERROR, this) << "TCP send of " << size
|
||||
<< " bytes failed with error " << error_;
|
||||
}
|
||||
@ -222,42 +290,29 @@ void TCPPort::OnAddressReady(rtc::AsyncPacketSocket* socket,
|
||||
ICE_TYPE_PREFERENCE_HOST_TCP, 0, true);
|
||||
}
|
||||
|
||||
TCPConnection::TCPConnection(TCPPort* port, const Candidate& candidate,
|
||||
TCPConnection::TCPConnection(TCPPort* port,
|
||||
const Candidate& candidate,
|
||||
rtc::AsyncPacketSocket* socket)
|
||||
: Connection(port, 0, candidate), socket_(socket), error_(0) {
|
||||
bool outgoing = (socket_ == NULL);
|
||||
if (outgoing) {
|
||||
// TODO: Handle failures here (unlikely since TCP).
|
||||
int opts = (candidate.protocol() == SSLTCP_PROTOCOL_NAME) ?
|
||||
rtc::PacketSocketFactory::OPT_SSLTCP : 0;
|
||||
socket_ = port->socket_factory()->CreateClientTcpSocket(
|
||||
rtc::SocketAddress(port->ip(), 0),
|
||||
candidate.address(), port->proxy(), port->user_agent(), opts);
|
||||
if (socket_) {
|
||||
LOG_J(LS_VERBOSE, this) << "Connecting from "
|
||||
<< socket_->GetLocalAddress().ToSensitiveString()
|
||||
<< " to "
|
||||
<< candidate.address().ToSensitiveString();
|
||||
set_connected(false);
|
||||
socket_->SignalConnect.connect(this, &TCPConnection::OnConnect);
|
||||
} else {
|
||||
LOG_J(LS_WARNING, this) << "Failed to create connection to "
|
||||
<< candidate.address().ToSensitiveString();
|
||||
}
|
||||
: Connection(port, 0, candidate),
|
||||
socket_(socket),
|
||||
error_(0),
|
||||
outgoing_(socket == NULL),
|
||||
connection_pending_(false),
|
||||
pretending_to_be_writable_(false),
|
||||
reconnection_timeout_(cricket::CONNECTION_WRITE_CONNECT_TIMEOUT) {
|
||||
if (outgoing_) {
|
||||
CreateOutgoingTcpSocket();
|
||||
} else {
|
||||
// Incoming connections should match the network address.
|
||||
LOG_J(LS_VERBOSE, this)
|
||||
<< "socket ipaddr: " << socket_->GetLocalAddress().ToString()
|
||||
<< ",port() ip:" << port->ip().ToString();
|
||||
ASSERT(socket_->GetLocalAddress().ipaddr() == port->ip());
|
||||
}
|
||||
|
||||
if (socket_) {
|
||||
socket_->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket);
|
||||
socket_->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend);
|
||||
socket_->SignalClose.connect(this, &TCPConnection::OnClose);
|
||||
ConnectSocketSignals(socket);
|
||||
}
|
||||
}
|
||||
|
||||
TCPConnection::~TCPConnection() {
|
||||
delete socket_;
|
||||
}
|
||||
|
||||
int TCPConnection::Send(const void* data, size_t size,
|
||||
@ -267,7 +322,18 @@ int TCPConnection::Send(const void* data, size_t size,
|
||||
return SOCKET_ERROR;
|
||||
}
|
||||
|
||||
if (write_state() != STATE_WRITABLE) {
|
||||
// Sending after OnClose on active side will trigger a reconnect for a
|
||||
// outgoing connection. Note that the write state is still WRITABLE as we want
|
||||
// to spend a few seconds attempting a reconnect before saying we're
|
||||
// unwritable.
|
||||
if (!connected()) {
|
||||
MaybeReconnect();
|
||||
return SOCKET_ERROR;
|
||||
}
|
||||
|
||||
// Note that this is important to put this after the previous check to give
|
||||
// the connection a chance to reconnect.
|
||||
if (pretending_to_be_writable_ || write_state() != STATE_WRITABLE) {
|
||||
// TODO: Should STATE_WRITE_TIMEOUT return a non-blocking error?
|
||||
error_ = EWOULDBLOCK;
|
||||
return SOCKET_ERROR;
|
||||
@ -287,6 +353,15 @@ int TCPConnection::GetError() {
|
||||
return error_;
|
||||
}
|
||||
|
||||
void TCPConnection::OnConnectionRequestResponse(ConnectionRequest* req,
|
||||
StunMessage* response) {
|
||||
// Once we receive a binding response, we are really writable, and not just
|
||||
// pretending to be writable.
|
||||
pretending_to_be_writable_ = false;
|
||||
Connection::OnConnectionRequestResponse(req, response);
|
||||
ASSERT(write_state() == STATE_WRITABLE);
|
||||
}
|
||||
|
||||
void TCPConnection::OnConnect(rtc::AsyncPacketSocket* socket) {
|
||||
ASSERT(socket == socket_);
|
||||
// Do not use this connection if the socket bound to a different address than
|
||||
@ -298,6 +373,7 @@ void TCPConnection::OnConnect(rtc::AsyncPacketSocket* socket) {
|
||||
LOG_J(LS_VERBOSE, this) << "Connection established to "
|
||||
<< socket->GetRemoteAddress().ToSensitiveString();
|
||||
set_connected(true);
|
||||
connection_pending_ = false;
|
||||
} else {
|
||||
LOG_J(LS_WARNING, this) << "Dropping connection as TCP socket bound to IP "
|
||||
<< socket_ip.ToSensitiveString()
|
||||
@ -310,8 +386,48 @@ void TCPConnection::OnConnect(rtc::AsyncPacketSocket* socket) {
|
||||
void TCPConnection::OnClose(rtc::AsyncPacketSocket* socket, int error) {
|
||||
ASSERT(socket == socket_);
|
||||
LOG_J(LS_INFO, this) << "Connection closed with error " << error;
|
||||
set_connected(false);
|
||||
set_write_state(STATE_WRITE_TIMEOUT);
|
||||
|
||||
// Guard against the condition where IPC socket will call OnClose for every
|
||||
// packet it can't send.
|
||||
if (connected()) {
|
||||
set_connected(false);
|
||||
pretending_to_be_writable_ = true;
|
||||
|
||||
// We don't attempt reconnect right here. This is to avoid a case where the
|
||||
// shutdown is intentional and reconnect is not necessary. We only reconnect
|
||||
// when the connection is used to Send() or Ping().
|
||||
port()->thread()->PostDelayed(reconnection_timeout(), this,
|
||||
MSG_TCPCONNECTION_DELAYED_ONCLOSE);
|
||||
}
|
||||
}
|
||||
|
||||
void TCPConnection::OnMessage(rtc::Message* pmsg) {
|
||||
switch (pmsg->message_id) {
|
||||
case MSG_TCPCONNECTION_DELAYED_ONCLOSE:
|
||||
// If this connection can't become connected and writable again in 5
|
||||
// seconds, it's time to tear this down. This is the case for the original
|
||||
// TCP connection on passive side during a reconnect.
|
||||
if (pretending_to_be_writable_) {
|
||||
set_write_state(STATE_WRITE_TIMEOUT);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
Connection::OnMessage(pmsg);
|
||||
}
|
||||
}
|
||||
|
||||
void TCPConnection::MaybeReconnect() {
|
||||
// Only reconnect for an outgoing TCPConnection when OnClose was signaled and
|
||||
// no outstanding reconnect is pending.
|
||||
if (connected() || connection_pending_ || !outgoing_) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_J(LS_INFO, this) << "TCP Connection with remote is closed, "
|
||||
<< "trying to reconnect";
|
||||
|
||||
CreateOutgoingTcpSocket();
|
||||
error_ = EPIPE;
|
||||
}
|
||||
|
||||
void TCPConnection::OnReadPacket(
|
||||
@ -327,4 +443,35 @@ void TCPConnection::OnReadyToSend(rtc::AsyncPacketSocket* socket) {
|
||||
Connection::OnReadyToSend();
|
||||
}
|
||||
|
||||
void TCPConnection::CreateOutgoingTcpSocket() {
|
||||
ASSERT(outgoing_);
|
||||
// TODO(guoweis): Handle failures here (unlikely since TCP).
|
||||
int opts = (remote_candidate().protocol() == SSLTCP_PROTOCOL_NAME)
|
||||
? rtc::PacketSocketFactory::OPT_SSLTCP
|
||||
: 0;
|
||||
socket_.reset(port()->socket_factory()->CreateClientTcpSocket(
|
||||
rtc::SocketAddress(port()->ip(), 0), remote_candidate().address(),
|
||||
port()->proxy(), port()->user_agent(), opts));
|
||||
if (socket_) {
|
||||
LOG_J(LS_VERBOSE, this)
|
||||
<< "Connecting from " << socket_->GetLocalAddress().ToSensitiveString()
|
||||
<< " to " << remote_candidate().address().ToSensitiveString();
|
||||
set_connected(false);
|
||||
connection_pending_ = true;
|
||||
ConnectSocketSignals(socket_.get());
|
||||
} else {
|
||||
LOG_J(LS_WARNING, this) << "Failed to create connection to "
|
||||
<< remote_candidate().address().ToSensitiveString();
|
||||
}
|
||||
}
|
||||
|
||||
void TCPConnection::ConnectSocketSignals(rtc::AsyncPacketSocket* socket) {
|
||||
if (outgoing_) {
|
||||
socket->SignalConnect.connect(this, &TCPConnection::OnConnect);
|
||||
}
|
||||
socket->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket);
|
||||
socket->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend);
|
||||
socket->SignalClose.connect(this, &TCPConnection::OnClose);
|
||||
}
|
||||
|
||||
} // namespace cricket
|
||||
|
@ -119,9 +119,35 @@ class TCPConnection : public Connection {
|
||||
const rtc::PacketOptions& options);
|
||||
virtual int GetError();
|
||||
|
||||
rtc::AsyncPacketSocket* socket() { return socket_; }
|
||||
rtc::AsyncPacketSocket* socket() { return socket_.get(); }
|
||||
|
||||
void OnMessage(rtc::Message* pmsg);
|
||||
|
||||
// Allow test cases to overwrite the default timeout period.
|
||||
int reconnection_timeout() const { return reconnection_timeout_; }
|
||||
void set_reconnection_timeout(int timeout_in_ms) {
|
||||
reconnection_timeout_ = timeout_in_ms;
|
||||
}
|
||||
|
||||
protected:
|
||||
enum {
|
||||
MSG_TCPCONNECTION_DELAYED_ONCLOSE = Connection::MSG_FIRST_AVAILABLE,
|
||||
};
|
||||
|
||||
// Set waiting_for_stun_binding_complete_ to false to allow data packets in
|
||||
// addition to what Port::OnConnectionRequestResponse does.
|
||||
virtual void OnConnectionRequestResponse(ConnectionRequest* req,
|
||||
StunMessage* response);
|
||||
|
||||
private:
|
||||
// Helper function to handle the case when Ping or Send fails with error
|
||||
// related to socket close.
|
||||
void MaybeReconnect();
|
||||
|
||||
void CreateOutgoingTcpSocket();
|
||||
|
||||
void ConnectSocketSignals(rtc::AsyncPacketSocket* socket);
|
||||
|
||||
void OnConnect(rtc::AsyncPacketSocket* socket);
|
||||
void OnClose(rtc::AsyncPacketSocket* socket, int error);
|
||||
void OnReadPacket(rtc::AsyncPacketSocket* socket,
|
||||
@ -130,8 +156,23 @@ class TCPConnection : public Connection {
|
||||
const rtc::PacketTime& packet_time);
|
||||
void OnReadyToSend(rtc::AsyncPacketSocket* socket);
|
||||
|
||||
rtc::AsyncPacketSocket* socket_;
|
||||
rtc::scoped_ptr<rtc::AsyncPacketSocket> socket_;
|
||||
int error_;
|
||||
bool outgoing_;
|
||||
|
||||
// Guard against multiple outgoing tcp connection during a reconnect.
|
||||
bool connection_pending_;
|
||||
|
||||
// Guard against data packets sent when we reconnect a TCP connection. During
|
||||
// reconnecting, when a new tcp connection has being made, we can't send data
|
||||
// packets out until the STUN binding is completed (i.e. the write state is
|
||||
// set to WRITABLE again by Connection::OnConnectionRequestResponse). IPC
|
||||
// socket, when receiving data packets before that, will trigger OnError which
|
||||
// will terminate the newly created connection.
|
||||
bool pretending_to_be_writable_;
|
||||
|
||||
// Allow test case to overwrite the default timeout period.
|
||||
int reconnection_timeout_;
|
||||
|
||||
friend class TCPPort;
|
||||
};
|
||||
|
@ -9,6 +9,7 @@
|
||||
*/
|
||||
|
||||
#include <sstream>
|
||||
#include "webrtc/p2p/base/common.h"
|
||||
#include "webrtc/p2p/base/transportchannel.h"
|
||||
|
||||
namespace cricket {
|
||||
@ -32,6 +33,8 @@ void TransportChannel::set_readable(bool readable) {
|
||||
|
||||
void TransportChannel::set_writable(bool writable) {
|
||||
if (writable_ != writable) {
|
||||
LOG_J(LS_VERBOSE, this) << "set_writable from:" << writable_ << " to "
|
||||
<< writable;
|
||||
writable_ = writable;
|
||||
if (writable_) {
|
||||
SignalReadyToSend(this);
|
||||
|
Loading…
x
Reference in New Issue
Block a user