5a695d6094
Also fixing potential out-of-order delivery of signaling messages. Review URL: http://webrtc-codereview.appspot.com/214005 git-svn-id: http://webrtc.googlecode.com/svn/trunk@716 4adac7df-926f-26a2-2b94-8c16560cd09d
483 lines
14 KiB
C++
483 lines
14 KiB
C++
/*
|
|
* Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
|
|
#include "peerconnection/samples/client/peer_connection_client.h"
|
|
|
|
#include "peerconnection/samples/client/defaults.h"
|
|
#include "talk/base/common.h"
|
|
#include "talk/base/nethelpers.h"
|
|
#include "talk/base/logging.h"
|
|
#include "talk/base/stringutils.h"
|
|
|
|
#ifdef WIN32
|
|
#include "talk/base/win32socketserver.h"
|
|
#endif
|
|
|
|
using talk_base::sprintfn;
|
|
|
|
namespace {
|
|
|
|
// This is our magical hangup signal.
|
|
const char kByeMessage[] = "BYE";
|
|
|
|
talk_base::AsyncSocket* CreateClientSocket() {
|
|
#ifdef WIN32
|
|
return new talk_base::Win32Socket();
|
|
#elif defined(POSIX)
|
|
talk_base::Thread* thread = talk_base::Thread::Current();
|
|
ASSERT(thread != NULL);
|
|
return thread->socketserver()->CreateAsyncSocket(SOCK_STREAM);
|
|
#else
|
|
#error Platform not supported.
|
|
#endif
|
|
}
|
|
|
|
}
|
|
|
|
PeerConnectionClient::PeerConnectionClient()
|
|
: callback_(NULL),
|
|
control_socket_(CreateClientSocket()),
|
|
hanging_get_(CreateClientSocket()),
|
|
state_(NOT_CONNECTED),
|
|
my_id_(-1) {
|
|
control_socket_->SignalCloseEvent.connect(this,
|
|
&PeerConnectionClient::OnClose);
|
|
hanging_get_->SignalCloseEvent.connect(this,
|
|
&PeerConnectionClient::OnClose);
|
|
control_socket_->SignalConnectEvent.connect(this,
|
|
&PeerConnectionClient::OnConnect);
|
|
hanging_get_->SignalConnectEvent.connect(this,
|
|
&PeerConnectionClient::OnHangingGetConnect);
|
|
control_socket_->SignalReadEvent.connect(this,
|
|
&PeerConnectionClient::OnRead);
|
|
hanging_get_->SignalReadEvent.connect(this,
|
|
&PeerConnectionClient::OnHangingGetRead);
|
|
}
|
|
|
|
PeerConnectionClient::~PeerConnectionClient() {
|
|
}
|
|
|
|
int PeerConnectionClient::id() const {
|
|
return my_id_;
|
|
}
|
|
|
|
bool PeerConnectionClient::is_connected() const {
|
|
return my_id_ != -1;
|
|
}
|
|
|
|
const Peers& PeerConnectionClient::peers() const {
|
|
return peers_;
|
|
}
|
|
|
|
void PeerConnectionClient::RegisterObserver(
|
|
PeerConnectionClientObserver* callback) {
|
|
ASSERT(!callback_);
|
|
callback_ = callback;
|
|
}
|
|
|
|
bool PeerConnectionClient::Connect(const std::string& server, int port,
|
|
const std::string& client_name) {
|
|
ASSERT(!server.empty());
|
|
ASSERT(!client_name.empty());
|
|
|
|
if (state_ != NOT_CONNECTED) {
|
|
LOG(WARNING)
|
|
<< "The client must not be connected before you can call Connect()";
|
|
return false;
|
|
}
|
|
|
|
if (server.empty() || client_name.empty())
|
|
return false;
|
|
|
|
if (port <= 0)
|
|
port = kDefaultServerPort;
|
|
|
|
server_address_.SetIP(server);
|
|
server_address_.SetPort(port);
|
|
|
|
if (server_address_.IsUnresolved()) {
|
|
int errcode = 0;
|
|
hostent* h = talk_base::SafeGetHostByName(
|
|
server_address_.IPAsString().c_str(), &errcode);
|
|
if (!h) {
|
|
LOG(LS_ERROR) << "Failed to resolve host name: "
|
|
<< server_address_.IPAsString();
|
|
return false;
|
|
} else {
|
|
server_address_.SetResolvedIP(
|
|
ntohl(*reinterpret_cast<uint32*>(h->h_addr_list[0])));
|
|
talk_base::FreeHostEnt(h);
|
|
}
|
|
}
|
|
|
|
char buffer[1024];
|
|
sprintfn(buffer, sizeof(buffer),
|
|
"GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name.c_str());
|
|
onconnect_data_ = buffer;
|
|
|
|
bool ret = ConnectControlSocket();
|
|
if (ret)
|
|
state_ = SIGNING_IN;
|
|
|
|
return ret;
|
|
}
|
|
|
|
bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
|
|
if (state_ != CONNECTED)
|
|
return false;
|
|
|
|
ASSERT(is_connected());
|
|
ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
|
|
if (!is_connected() || peer_id == -1)
|
|
return false;
|
|
|
|
char headers[1024];
|
|
sprintfn(headers, sizeof(headers),
|
|
"POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
|
|
"Content-Length: %i\r\n"
|
|
"Content-Type: text/plain\r\n"
|
|
"\r\n",
|
|
my_id_, peer_id, message.length());
|
|
onconnect_data_ = headers;
|
|
onconnect_data_ += message;
|
|
return ConnectControlSocket();
|
|
}
|
|
|
|
bool PeerConnectionClient::SendHangUp(int peer_id) {
|
|
return SendToPeer(peer_id, kByeMessage);
|
|
}
|
|
|
|
bool PeerConnectionClient::IsSendingMessage() {
|
|
return state_ == CONNECTED &&
|
|
control_socket_->GetState() != talk_base::Socket::CS_CLOSED;
|
|
}
|
|
|
|
bool PeerConnectionClient::SignOut() {
|
|
if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
|
|
return true;
|
|
|
|
if (hanging_get_->GetState() != talk_base::Socket::CS_CLOSED)
|
|
hanging_get_->Close();
|
|
|
|
if (control_socket_->GetState() == talk_base::Socket::CS_CLOSED) {
|
|
state_ = SIGNING_OUT;
|
|
|
|
if (my_id_ != -1) {
|
|
char buffer[1024];
|
|
sprintfn(buffer, sizeof(buffer),
|
|
"GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
|
|
onconnect_data_ = buffer;
|
|
return ConnectControlSocket();
|
|
} else {
|
|
// Can occur if the app is closed before we finish connecting.
|
|
return true;
|
|
}
|
|
} else {
|
|
state_ = SIGNING_OUT_WAITING;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void PeerConnectionClient::Close() {
|
|
control_socket_->Close();
|
|
hanging_get_->Close();
|
|
onconnect_data_.clear();
|
|
peers_.clear();
|
|
my_id_ = -1;
|
|
state_ = NOT_CONNECTED;
|
|
}
|
|
|
|
bool PeerConnectionClient::ConnectControlSocket() {
|
|
ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
|
|
int err = control_socket_->Connect(server_address_);
|
|
if (err == SOCKET_ERROR) {
|
|
Close();
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void PeerConnectionClient::OnConnect(talk_base::AsyncSocket* socket) {
|
|
ASSERT(!onconnect_data_.empty());
|
|
size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
|
|
ASSERT(sent == onconnect_data_.length());
|
|
UNUSED(sent);
|
|
onconnect_data_.clear();
|
|
}
|
|
|
|
void PeerConnectionClient::OnHangingGetConnect(talk_base::AsyncSocket* socket) {
|
|
char buffer[1024];
|
|
sprintfn(buffer, sizeof(buffer),
|
|
"GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
|
|
int len = strlen(buffer);
|
|
int sent = socket->Send(buffer, len);
|
|
ASSERT(sent == len);
|
|
UNUSED2(sent, len);
|
|
}
|
|
|
|
void PeerConnectionClient::OnMessageFromPeer(int peer_id,
|
|
const std::string& message) {
|
|
if (message.length() == (sizeof(kByeMessage) - 1) &&
|
|
message.compare(kByeMessage) == 0) {
|
|
callback_->OnPeerDisconnected(peer_id);
|
|
} else {
|
|
callback_->OnMessageFromPeer(peer_id, message);
|
|
}
|
|
}
|
|
|
|
bool PeerConnectionClient::GetHeaderValue(const std::string& data,
|
|
size_t eoh,
|
|
const char* header_pattern,
|
|
size_t* value) {
|
|
ASSERT(value != NULL);
|
|
size_t found = data.find(header_pattern);
|
|
if (found != std::string::npos && found < eoh) {
|
|
*value = atoi(&data[found + strlen(header_pattern)]);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
|
|
const char* header_pattern,
|
|
std::string* value) {
|
|
ASSERT(value != NULL);
|
|
size_t found = data.find(header_pattern);
|
|
if (found != std::string::npos && found < eoh) {
|
|
size_t begin = found + strlen(header_pattern);
|
|
size_t end = data.find("\r\n", begin);
|
|
if (end == std::string::npos)
|
|
end = eoh;
|
|
value->assign(data.substr(begin, end - begin));
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool PeerConnectionClient::ReadIntoBuffer(talk_base::AsyncSocket* socket,
|
|
std::string* data,
|
|
size_t* content_length) {
|
|
LOG(INFO) << __FUNCTION__;
|
|
|
|
char buffer[0xffff];
|
|
do {
|
|
int bytes = socket->Recv(buffer, sizeof(buffer));
|
|
if (bytes <= 0)
|
|
break;
|
|
data->append(buffer, bytes);
|
|
} while (true);
|
|
|
|
bool ret = false;
|
|
size_t i = data->find("\r\n\r\n");
|
|
if (i != std::string::npos) {
|
|
LOG(INFO) << "Headers received";
|
|
if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
|
|
LOG(INFO) << "Expecting " << *content_length << " bytes.";
|
|
size_t total_response_size = (i + 4) + *content_length;
|
|
if (data->length() >= total_response_size) {
|
|
ret = true;
|
|
std::string should_close;
|
|
const char kConnection[] = "\r\nConnection: ";
|
|
if (GetHeaderValue(*data, i, kConnection, &should_close) &&
|
|
should_close.compare("close") == 0) {
|
|
socket->Close();
|
|
// Since we closed the socket, there was no notification delivered
|
|
// to us. Compensate by letting ourselves know.
|
|
OnClose(socket, NO_ERROR);
|
|
}
|
|
} else {
|
|
// We haven't received everything. Just continue to accept data.
|
|
}
|
|
} else {
|
|
LOG(LS_ERROR) << "No content length field specified by the server.";
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void PeerConnectionClient::OnRead(talk_base::AsyncSocket* socket) {
|
|
LOG(INFO) << __FUNCTION__;
|
|
size_t content_length = 0;
|
|
if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
|
|
size_t peer_id = 0, eoh = 0;
|
|
bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
|
|
&eoh);
|
|
if (ok) {
|
|
if (my_id_ == -1) {
|
|
// First response. Let's store our server assigned ID.
|
|
ASSERT(state_ == SIGNING_IN);
|
|
my_id_ = peer_id;
|
|
ASSERT(my_id_ != -1);
|
|
|
|
// The body of the response will be a list of already connected peers.
|
|
if (content_length) {
|
|
size_t pos = eoh + 4;
|
|
while (pos < control_data_.size()) {
|
|
size_t eol = control_data_.find('\n', pos);
|
|
if (eol == std::string::npos)
|
|
break;
|
|
int id = 0;
|
|
std::string name;
|
|
bool connected;
|
|
if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
|
|
&connected) && id != my_id_) {
|
|
peers_[id] = name;
|
|
callback_->OnPeerConnected(id, name);
|
|
}
|
|
pos = eol + 1;
|
|
}
|
|
}
|
|
ASSERT(is_connected());
|
|
callback_->OnSignedIn();
|
|
} else if (state_ == SIGNING_OUT) {
|
|
Close();
|
|
callback_->OnDisconnected();
|
|
} else if (state_ == SIGNING_OUT_WAITING) {
|
|
SignOut();
|
|
}
|
|
}
|
|
|
|
control_data_.clear();
|
|
|
|
if (state_ == SIGNING_IN) {
|
|
ASSERT(hanging_get_->GetState() == talk_base::Socket::CS_CLOSED);
|
|
state_ = CONNECTED;
|
|
hanging_get_->Connect(server_address_);
|
|
}
|
|
}
|
|
}
|
|
|
|
void PeerConnectionClient::OnHangingGetRead(talk_base::AsyncSocket* socket) {
|
|
LOG(INFO) << __FUNCTION__;
|
|
size_t content_length = 0;
|
|
if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) {
|
|
size_t peer_id = 0, eoh = 0;
|
|
bool ok = ParseServerResponse(notification_data_, content_length,
|
|
&peer_id, &eoh);
|
|
|
|
if (ok) {
|
|
// Store the position where the body begins.
|
|
size_t pos = eoh + 4;
|
|
|
|
if (my_id_ == static_cast<int>(peer_id)) {
|
|
// A notification about a new member or a member that just
|
|
// disconnected.
|
|
int id = 0;
|
|
std::string name;
|
|
bool connected = false;
|
|
if (ParseEntry(notification_data_.substr(pos), &name, &id,
|
|
&connected)) {
|
|
if (connected) {
|
|
peers_[id] = name;
|
|
callback_->OnPeerConnected(id, name);
|
|
} else {
|
|
peers_.erase(id);
|
|
callback_->OnPeerDisconnected(id);
|
|
}
|
|
}
|
|
} else {
|
|
OnMessageFromPeer(peer_id, notification_data_.substr(pos));
|
|
}
|
|
}
|
|
|
|
notification_data_.clear();
|
|
}
|
|
|
|
if (hanging_get_->GetState() == talk_base::Socket::CS_CLOSED &&
|
|
state_ == CONNECTED) {
|
|
hanging_get_->Connect(server_address_);
|
|
}
|
|
}
|
|
|
|
bool PeerConnectionClient::ParseEntry(const std::string& entry,
|
|
std::string* name,
|
|
int* id,
|
|
bool* connected) {
|
|
ASSERT(name != NULL);
|
|
ASSERT(id != NULL);
|
|
ASSERT(connected != NULL);
|
|
ASSERT(!entry.empty());
|
|
|
|
*connected = false;
|
|
size_t separator = entry.find(',');
|
|
if (separator != std::string::npos) {
|
|
*id = atoi(&entry[separator + 1]);
|
|
name->assign(entry.substr(0, separator));
|
|
separator = entry.find(',', separator + 1);
|
|
if (separator != std::string::npos) {
|
|
*connected = atoi(&entry[separator + 1]) ? true : false;
|
|
}
|
|
}
|
|
return !name->empty();
|
|
}
|
|
|
|
int PeerConnectionClient::GetResponseStatus(const std::string& response) {
|
|
int status = -1;
|
|
size_t pos = response.find(' ');
|
|
if (pos != std::string::npos)
|
|
status = atoi(&response[pos + 1]);
|
|
return status;
|
|
}
|
|
|
|
bool PeerConnectionClient::ParseServerResponse(const std::string& response,
|
|
size_t content_length,
|
|
size_t* peer_id,
|
|
size_t* eoh) {
|
|
LOG(INFO) << response;
|
|
|
|
int status = GetResponseStatus(response.c_str());
|
|
if (status != 200) {
|
|
LOG(LS_ERROR) << "Received error from server";
|
|
Close();
|
|
callback_->OnDisconnected();
|
|
return false;
|
|
}
|
|
|
|
*eoh = response.find("\r\n\r\n");
|
|
ASSERT(*eoh != std::string::npos);
|
|
if (*eoh == std::string::npos)
|
|
return false;
|
|
|
|
*peer_id = -1;
|
|
|
|
// See comment in peer_channel.cc for why we use the Pragma header and
|
|
// not e.g. "X-Peer-Id".
|
|
GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
|
|
|
|
return true;
|
|
}
|
|
|
|
void PeerConnectionClient::OnClose(talk_base::AsyncSocket* socket, int err) {
|
|
LOG(INFO) << __FUNCTION__;
|
|
|
|
socket->Close();
|
|
|
|
#ifdef WIN32
|
|
if (err != WSAECONNREFUSED) {
|
|
#else
|
|
if (err != ECONNREFUSED) {
|
|
#endif
|
|
if (socket == hanging_get_.get()) {
|
|
if (state_ == CONNECTED) {
|
|
LOG(INFO) << "Issuing a new hanging get";
|
|
hanging_get_->Close();
|
|
hanging_get_->Connect(server_address_);
|
|
}
|
|
} else {
|
|
callback_->OnMessageSent(err);
|
|
}
|
|
} else {
|
|
LOG(WARNING) << "Failed to connect to the server";
|
|
Close();
|
|
callback_->OnDisconnected();
|
|
}
|
|
}
|