 28654cbc22
			
		
	
	28654cbc22
	
	
	
		
			
			TBR=mallinath@webrtc.org Review URL: https://webrtc-codereview.appspot.com/1848004 git-svn-id: http://webrtc.googlecode.com/svn/trunk@4380 4adac7df-926f-26a2-2b94-8c16560cd09d
		
			
				
	
	
		
			533 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			533 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * libjingle
 | |
|  * Copyright 2012, Google Inc.
 | |
|  *
 | |
|  * Redistribution and use in source and binary forms, with or without
 | |
|  * modification, are permitted provided that the following conditions are met:
 | |
|  *
 | |
|  *  1. Redistributions of source code must retain the above copyright notice,
 | |
|  *     this list of conditions and the following disclaimer.
 | |
|  *  2. Redistributions in binary form must reproduce the above copyright notice,
 | |
|  *     this list of conditions and the following disclaimer in the documentation
 | |
|  *     and/or other materials provided with the distribution.
 | |
|  *  3. The name of the author may not be used to endorse or promote products
 | |
|  *     derived from this software without specific prior written permission.
 | |
|  *
 | |
|  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
 | |
|  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
 | |
|  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
 | |
|  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 | |
|  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 | |
|  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 | |
|  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 | |
|  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 | |
|  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 | |
|  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | |
|  */
 | |
| 
 | |
| #include "talk/examples/peerconnection/client/peer_connection_client.h"
 | |
| 
 | |
| #include "talk/examples/peerconnection/client/defaults.h"
 | |
| #include "talk/base/common.h"
 | |
| #include "talk/base/nethelpers.h"
 | |
| #include "talk/base/logging.h"
 | |
| #include "talk/base/stringutils.h"
 | |
| 
 | |
| #ifdef WIN32
 | |
| #include "talk/base/win32socketserver.h"
 | |
| #endif
 | |
| 
 | |
| using talk_base::sprintfn;
 | |
| 
 | |
| namespace {
 | |
| 
 | |
| // This is our magical hangup signal.
 | |
| const char kByeMessage[] = "BYE";
 | |
| // Delay between server connection retries, in milliseconds
 | |
| const int kReconnectDelay = 2000;
 | |
| 
 | |
| talk_base::AsyncSocket* CreateClientSocket(int family) {
 | |
| #ifdef WIN32
 | |
|   talk_base::Win32Socket* sock = new talk_base::Win32Socket();
 | |
|   sock->CreateT(family, SOCK_STREAM);
 | |
|   return sock;
 | |
| #elif defined(POSIX)
 | |
|   talk_base::Thread* thread = talk_base::Thread::Current();
 | |
|   ASSERT(thread != NULL);
 | |
|   return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
 | |
| #else
 | |
| #error Platform not supported.
 | |
| #endif
 | |
| }
 | |
| 
 | |
| }
 | |
| 
 | |
| PeerConnectionClient::PeerConnectionClient()
 | |
|   : callback_(NULL),
 | |
|     resolver_(NULL),
 | |
|     state_(NOT_CONNECTED),
 | |
|     my_id_(-1) {
 | |
| }
 | |
| 
 | |
| PeerConnectionClient::~PeerConnectionClient() {
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::InitSocketSignals() {
 | |
|   ASSERT(control_socket_.get() != NULL);
 | |
|   ASSERT(hanging_get_.get() != NULL);
 | |
|   control_socket_->SignalCloseEvent.connect(this,
 | |
|       &PeerConnectionClient::OnClose);
 | |
|   hanging_get_->SignalCloseEvent.connect(this,
 | |
|       &PeerConnectionClient::OnClose);
 | |
|   control_socket_->SignalConnectEvent.connect(this,
 | |
|       &PeerConnectionClient::OnConnect);
 | |
|   hanging_get_->SignalConnectEvent.connect(this,
 | |
|       &PeerConnectionClient::OnHangingGetConnect);
 | |
|   control_socket_->SignalReadEvent.connect(this,
 | |
|       &PeerConnectionClient::OnRead);
 | |
|   hanging_get_->SignalReadEvent.connect(this,
 | |
|       &PeerConnectionClient::OnHangingGetRead);
 | |
| }
 | |
| 
 | |
| int PeerConnectionClient::id() const {
 | |
|   return my_id_;
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::is_connected() const {
 | |
|   return my_id_ != -1;
 | |
| }
 | |
| 
 | |
| const Peers& PeerConnectionClient::peers() const {
 | |
|   return peers_;
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::RegisterObserver(
 | |
|     PeerConnectionClientObserver* callback) {
 | |
|   ASSERT(!callback_);
 | |
|   callback_ = callback;
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::Connect(const std::string& server, int port,
 | |
|                                    const std::string& client_name) {
 | |
|   ASSERT(!server.empty());
 | |
|   ASSERT(!client_name.empty());
 | |
| 
 | |
|   if (state_ != NOT_CONNECTED) {
 | |
|     LOG(WARNING)
 | |
|         << "The client must not be connected before you can call Connect()";
 | |
|     callback_->OnServerConnectionFailure();
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (server.empty() || client_name.empty()) {
 | |
|     callback_->OnServerConnectionFailure();
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (port <= 0)
 | |
|     port = kDefaultServerPort;
 | |
| 
 | |
|   server_address_.SetIP(server);
 | |
|   server_address_.SetPort(port);
 | |
|   client_name_ = client_name;
 | |
| 
 | |
|   if (server_address_.IsUnresolved()) {
 | |
|     state_ = RESOLVING;
 | |
|     resolver_ = new talk_base::AsyncResolver();
 | |
|     resolver_->SignalWorkDone.connect(this,
 | |
|                                       &PeerConnectionClient::OnResolveResult);
 | |
|     resolver_->set_address(server_address_);
 | |
|     resolver_->Start();
 | |
|   } else {
 | |
|     DoConnect();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::OnResolveResult(talk_base::SignalThread *t) {
 | |
|   if (resolver_->error() != 0) {
 | |
|     callback_->OnServerConnectionFailure();
 | |
|     resolver_->Destroy(false);
 | |
|     resolver_ = NULL;
 | |
|     state_ = NOT_CONNECTED;
 | |
|   } else {
 | |
|     server_address_ = resolver_->address();
 | |
|     DoConnect();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::DoConnect() {
 | |
|   control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
 | |
|   hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
 | |
|   InitSocketSignals();
 | |
|   char buffer[1024];
 | |
|   sprintfn(buffer, sizeof(buffer),
 | |
|            "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
 | |
|   onconnect_data_ = buffer;
 | |
| 
 | |
|   bool ret = ConnectControlSocket();
 | |
|   if (ret)
 | |
|     state_ = SIGNING_IN;
 | |
|   if (!ret) {
 | |
|     callback_->OnServerConnectionFailure();
 | |
|   }
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
 | |
|   if (state_ != CONNECTED)
 | |
|     return false;
 | |
| 
 | |
|   ASSERT(is_connected());
 | |
|   ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
 | |
|   if (!is_connected() || peer_id == -1)
 | |
|     return false;
 | |
| 
 | |
|   char headers[1024];
 | |
|   sprintfn(headers, sizeof(headers),
 | |
|       "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
 | |
|       "Content-Length: %i\r\n"
 | |
|       "Content-Type: text/plain\r\n"
 | |
|       "\r\n",
 | |
|       my_id_, peer_id, message.length());
 | |
|   onconnect_data_ = headers;
 | |
|   onconnect_data_ += message;
 | |
|   return ConnectControlSocket();
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::SendHangUp(int peer_id) {
 | |
|   return SendToPeer(peer_id, kByeMessage);
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::IsSendingMessage() {
 | |
|   return state_ == CONNECTED &&
 | |
|          control_socket_->GetState() != talk_base::Socket::CS_CLOSED;
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::SignOut() {
 | |
|   if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
 | |
|     return true;
 | |
| 
 | |
|   if (hanging_get_->GetState() != talk_base::Socket::CS_CLOSED)
 | |
|     hanging_get_->Close();
 | |
| 
 | |
|   if (control_socket_->GetState() == talk_base::Socket::CS_CLOSED) {
 | |
|     state_ = SIGNING_OUT;
 | |
| 
 | |
|     if (my_id_ != -1) {
 | |
|       char buffer[1024];
 | |
|       sprintfn(buffer, sizeof(buffer),
 | |
|           "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
 | |
|       onconnect_data_ = buffer;
 | |
|       return ConnectControlSocket();
 | |
|     } else {
 | |
|       // Can occur if the app is closed before we finish connecting.
 | |
|       return true;
 | |
|     }
 | |
|   } else {
 | |
|     state_ = SIGNING_OUT_WAITING;
 | |
|   }
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::Close() {
 | |
|   control_socket_->Close();
 | |
|   hanging_get_->Close();
 | |
|   onconnect_data_.clear();
 | |
|   peers_.clear();
 | |
|   if (resolver_ != NULL) {
 | |
|     resolver_->Destroy(false);
 | |
|     resolver_ = NULL;
 | |
|   }
 | |
|   my_id_ = -1;
 | |
|   state_ = NOT_CONNECTED;
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::ConnectControlSocket() {
 | |
|   ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
 | |
|   int err = control_socket_->Connect(server_address_);
 | |
|   if (err == SOCKET_ERROR) {
 | |
|     Close();
 | |
|     return false;
 | |
|   }
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::OnConnect(talk_base::AsyncSocket* socket) {
 | |
|   ASSERT(!onconnect_data_.empty());
 | |
|   size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
 | |
|   ASSERT(sent == onconnect_data_.length());
 | |
|   UNUSED(sent);
 | |
|   onconnect_data_.clear();
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::OnHangingGetConnect(talk_base::AsyncSocket* socket) {
 | |
|   char buffer[1024];
 | |
|   sprintfn(buffer, sizeof(buffer),
 | |
|            "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
 | |
|   int len = static_cast<int>(strlen(buffer));
 | |
|   int sent = socket->Send(buffer, len);
 | |
|   ASSERT(sent == len);
 | |
|   UNUSED2(sent, len);
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::OnMessageFromPeer(int peer_id,
 | |
|                                              const std::string& message) {
 | |
|   if (message.length() == (sizeof(kByeMessage) - 1) &&
 | |
|       message.compare(kByeMessage) == 0) {
 | |
|     callback_->OnPeerDisconnected(peer_id);
 | |
|   } else {
 | |
|     callback_->OnMessageFromPeer(peer_id, message);
 | |
|   }
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::GetHeaderValue(const std::string& data,
 | |
|                                           size_t eoh,
 | |
|                                           const char* header_pattern,
 | |
|                                           size_t* value) {
 | |
|   ASSERT(value != NULL);
 | |
|   size_t found = data.find(header_pattern);
 | |
|   if (found != std::string::npos && found < eoh) {
 | |
|     *value = atoi(&data[found + strlen(header_pattern)]);
 | |
|     return true;
 | |
|   }
 | |
|   return false;
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
 | |
|                                           const char* header_pattern,
 | |
|                                           std::string* value) {
 | |
|   ASSERT(value != NULL);
 | |
|   size_t found = data.find(header_pattern);
 | |
|   if (found != std::string::npos && found < eoh) {
 | |
|     size_t begin = found + strlen(header_pattern);
 | |
|     size_t end = data.find("\r\n", begin);
 | |
|     if (end == std::string::npos)
 | |
|       end = eoh;
 | |
|     value->assign(data.substr(begin, end - begin));
 | |
|     return true;
 | |
|   }
 | |
|   return false;
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::ReadIntoBuffer(talk_base::AsyncSocket* socket,
 | |
|                                           std::string* data,
 | |
|                                           size_t* content_length) {
 | |
|   char buffer[0xffff];
 | |
|   do {
 | |
|     int bytes = socket->Recv(buffer, sizeof(buffer));
 | |
|     if (bytes <= 0)
 | |
|       break;
 | |
|     data->append(buffer, bytes);
 | |
|   } while (true);
 | |
| 
 | |
|   bool ret = false;
 | |
|   size_t i = data->find("\r\n\r\n");
 | |
|   if (i != std::string::npos) {
 | |
|     LOG(INFO) << "Headers received";
 | |
|     if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
 | |
|       size_t total_response_size = (i + 4) + *content_length;
 | |
|       if (data->length() >= total_response_size) {
 | |
|         ret = true;
 | |
|         std::string should_close;
 | |
|         const char kConnection[] = "\r\nConnection: ";
 | |
|         if (GetHeaderValue(*data, i, kConnection, &should_close) &&
 | |
|             should_close.compare("close") == 0) {
 | |
|           socket->Close();
 | |
|           // Since we closed the socket, there was no notification delivered
 | |
|           // to us.  Compensate by letting ourselves know.
 | |
|           OnClose(socket, 0);
 | |
|         }
 | |
|       } else {
 | |
|         // We haven't received everything.  Just continue to accept data.
 | |
|       }
 | |
|     } else {
 | |
|       LOG(LS_ERROR) << "No content length field specified by the server.";
 | |
|     }
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::OnRead(talk_base::AsyncSocket* socket) {
 | |
|   size_t content_length = 0;
 | |
|   if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
 | |
|     size_t peer_id = 0, eoh = 0;
 | |
|     bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
 | |
|                                   &eoh);
 | |
|     if (ok) {
 | |
|       if (my_id_ == -1) {
 | |
|         // First response.  Let's store our server assigned ID.
 | |
|         ASSERT(state_ == SIGNING_IN);
 | |
|         my_id_ = static_cast<int>(peer_id);
 | |
|         ASSERT(my_id_ != -1);
 | |
| 
 | |
|         // The body of the response will be a list of already connected peers.
 | |
|         if (content_length) {
 | |
|           size_t pos = eoh + 4;
 | |
|           while (pos < control_data_.size()) {
 | |
|             size_t eol = control_data_.find('\n', pos);
 | |
|             if (eol == std::string::npos)
 | |
|               break;
 | |
|             int id = 0;
 | |
|             std::string name;
 | |
|             bool connected;
 | |
|             if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
 | |
|                            &connected) && id != my_id_) {
 | |
|               peers_[id] = name;
 | |
|               callback_->OnPeerConnected(id, name);
 | |
|             }
 | |
|             pos = eol + 1;
 | |
|           }
 | |
|         }
 | |
|         ASSERT(is_connected());
 | |
|         callback_->OnSignedIn();
 | |
|       } else if (state_ == SIGNING_OUT) {
 | |
|         Close();
 | |
|         callback_->OnDisconnected();
 | |
|       } else if (state_ == SIGNING_OUT_WAITING) {
 | |
|         SignOut();
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     control_data_.clear();
 | |
| 
 | |
|     if (state_ == SIGNING_IN) {
 | |
|       ASSERT(hanging_get_->GetState() == talk_base::Socket::CS_CLOSED);
 | |
|       state_ = CONNECTED;
 | |
|       hanging_get_->Connect(server_address_);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::OnHangingGetRead(talk_base::AsyncSocket* socket) {
 | |
|   LOG(INFO) << __FUNCTION__;
 | |
|   size_t content_length = 0;
 | |
|   if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) {
 | |
|     size_t peer_id = 0, eoh = 0;
 | |
|     bool ok = ParseServerResponse(notification_data_, content_length,
 | |
|                                   &peer_id, &eoh);
 | |
| 
 | |
|     if (ok) {
 | |
|       // Store the position where the body begins.
 | |
|       size_t pos = eoh + 4;
 | |
| 
 | |
|       if (my_id_ == static_cast<int>(peer_id)) {
 | |
|         // A notification about a new member or a member that just
 | |
|         // disconnected.
 | |
|         int id = 0;
 | |
|         std::string name;
 | |
|         bool connected = false;
 | |
|         if (ParseEntry(notification_data_.substr(pos), &name, &id,
 | |
|                        &connected)) {
 | |
|           if (connected) {
 | |
|             peers_[id] = name;
 | |
|             callback_->OnPeerConnected(id, name);
 | |
|           } else {
 | |
|             peers_.erase(id);
 | |
|             callback_->OnPeerDisconnected(id);
 | |
|           }
 | |
|         }
 | |
|       } else {
 | |
|         OnMessageFromPeer(static_cast<int>(peer_id),
 | |
|                           notification_data_.substr(pos));
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     notification_data_.clear();
 | |
|   }
 | |
| 
 | |
|   if (hanging_get_->GetState() == talk_base::Socket::CS_CLOSED &&
 | |
|       state_ == CONNECTED) {
 | |
|     hanging_get_->Connect(server_address_);
 | |
|   }
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::ParseEntry(const std::string& entry,
 | |
|                                       std::string* name,
 | |
|                                       int* id,
 | |
|                                       bool* connected) {
 | |
|   ASSERT(name != NULL);
 | |
|   ASSERT(id != NULL);
 | |
|   ASSERT(connected != NULL);
 | |
|   ASSERT(!entry.empty());
 | |
| 
 | |
|   *connected = false;
 | |
|   size_t separator = entry.find(',');
 | |
|   if (separator != std::string::npos) {
 | |
|     *id = atoi(&entry[separator + 1]);
 | |
|     name->assign(entry.substr(0, separator));
 | |
|     separator = entry.find(',', separator + 1);
 | |
|     if (separator != std::string::npos) {
 | |
|       *connected = atoi(&entry[separator + 1]) ? true : false;
 | |
|     }
 | |
|   }
 | |
|   return !name->empty();
 | |
| }
 | |
| 
 | |
| int PeerConnectionClient::GetResponseStatus(const std::string& response) {
 | |
|   int status = -1;
 | |
|   size_t pos = response.find(' ');
 | |
|   if (pos != std::string::npos)
 | |
|     status = atoi(&response[pos + 1]);
 | |
|   return status;
 | |
| }
 | |
| 
 | |
| bool PeerConnectionClient::ParseServerResponse(const std::string& response,
 | |
|                                                size_t content_length,
 | |
|                                                size_t* peer_id,
 | |
|                                                size_t* eoh) {
 | |
|   int status = GetResponseStatus(response.c_str());
 | |
|   if (status != 200) {
 | |
|     LOG(LS_ERROR) << "Received error from server";
 | |
|     Close();
 | |
|     callback_->OnDisconnected();
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   *eoh = response.find("\r\n\r\n");
 | |
|   ASSERT(*eoh != std::string::npos);
 | |
|   if (*eoh == std::string::npos)
 | |
|     return false;
 | |
| 
 | |
|   *peer_id = -1;
 | |
| 
 | |
|   // See comment in peer_channel.cc for why we use the Pragma header and
 | |
|   // not e.g. "X-Peer-Id".
 | |
|   GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::OnClose(talk_base::AsyncSocket* socket, int err) {
 | |
|   LOG(INFO) << __FUNCTION__;
 | |
| 
 | |
|   socket->Close();
 | |
| 
 | |
| #ifdef WIN32
 | |
|   if (err != WSAECONNREFUSED) {
 | |
| #else
 | |
|   if (err != ECONNREFUSED) {
 | |
| #endif
 | |
|     if (socket == hanging_get_.get()) {
 | |
|       if (state_ == CONNECTED) {
 | |
|         hanging_get_->Close();
 | |
|         hanging_get_->Connect(server_address_);
 | |
|       }
 | |
|     } else {
 | |
|       callback_->OnMessageSent(err);
 | |
|     }
 | |
|   } else {
 | |
|     if (socket == control_socket_.get()) {
 | |
|       LOG(WARNING) << "Connection refused; retrying in 2 seconds";
 | |
|       talk_base::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
 | |
|     } else {
 | |
|       Close();
 | |
|       callback_->OnDisconnected();
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void PeerConnectionClient::OnMessage(talk_base::Message* msg) {
 | |
|   // ignore msg; there is currently only one supported message ("retry")
 | |
|   DoConnect();
 | |
| }
 |