From 0eb6eec5cb8f8157301d858908b6956a631f20be Mon Sep 17 00:00:00 2001 From: "guoweis@webrtc.org" Date: Wed, 17 Dec 2014 22:03:33 +0000 Subject: [PATCH] Move VirtualSocket into the .h file to allow unit tests more control over behavior. BUG=3927 R=pthatcher@webrtc.org Review URL: https://webrtc-codereview.appspot.com/31289004 git-svn-id: http://webrtc.googlecode.com/svn/trunk@7935 4adac7df-926f-26a2-2b94-8c16560cd09d --- webrtc/base/virtualsocketserver.cc | 746 ++++++++++++++--------------- webrtc/base/virtualsocketserver.h | 99 ++++ 2 files changed, 449 insertions(+), 396 deletions(-) diff --git a/webrtc/base/virtualsocketserver.cc b/webrtc/base/virtualsocketserver.cc index 90d19e4aa..c2937bad2 100644 --- a/webrtc/base/virtualsocketserver.cc +++ b/webrtc/base/virtualsocketserver.cc @@ -91,136 +91,145 @@ struct MessageAddress : public MessageData { SocketAddress addr; }; -// Implements the socket interface using the virtual network. Packets are -// passed as messages using the message queue of the socket server. -class VirtualSocket : public AsyncSocket, public MessageHandler { - public: - VirtualSocket(VirtualSocketServer* server, int family, int type, bool async) - : server_(server), family_(family), type_(type), async_(async), - state_(CS_CLOSED), error_(0), listen_queue_(NULL), - write_enabled_(false), - network_size_(0), recv_buffer_size_(0), bound_(false), was_any_(false) { - ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); - ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams +VirtualSocket::VirtualSocket(VirtualSocketServer* server, + int family, + int type, + bool async) + : server_(server), + family_(family), + type_(type), + async_(async), + state_(CS_CLOSED), + error_(0), + listen_queue_(NULL), + write_enabled_(false), + network_size_(0), + recv_buffer_size_(0), + bound_(false), + was_any_(false) { + ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); + ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams +} + +VirtualSocket::~VirtualSocket() { + Close(); + + for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end(); + ++it) { + delete *it; } +} - virtual ~VirtualSocket() { - Close(); +SocketAddress VirtualSocket::GetLocalAddress() const { + if (!alternative_local_addr_.IsNil()) + return alternative_local_addr_; + return local_addr_; +} - for (RecvBuffer::iterator it = recv_buffer_.begin(); - it != recv_buffer_.end(); ++it) { - delete *it; - } +SocketAddress VirtualSocket::GetRemoteAddress() const { + return remote_addr_; +} + +// Used by server sockets to set the local address without binding. +void VirtualSocket::SetLocalAddress(const SocketAddress& addr) { + local_addr_ = addr; +} + +int VirtualSocket::Bind(const SocketAddress& addr) { + if (!local_addr_.IsNil()) { + error_ = EINVAL; + return -1; } - - virtual SocketAddress GetLocalAddress() const { - return local_addr_; - } - - virtual SocketAddress GetRemoteAddress() const { - return remote_addr_; - } - - // Used by server sockets to set the local address without binding. - void SetLocalAddress(const SocketAddress& addr) { - local_addr_ = addr; - } - - virtual int Bind(const SocketAddress& addr) { - if (!local_addr_.IsNil()) { - error_ = EINVAL; - return -1; - } - local_addr_ = addr; - int result = server_->Bind(this, &local_addr_); - if (result != 0) { - local_addr_.Clear(); - error_ = EADDRINUSE; - } else { - bound_ = true; - was_any_ = addr.IsAnyIP(); - } - return result; - } - - virtual int Connect(const SocketAddress& addr) { - return InitiateConnect(addr, true); - } - - virtual int Close() { - if (!local_addr_.IsNil() && bound_) { - // Remove from the binding table. - server_->Unbind(local_addr_, this); - bound_ = false; - } - - if (SOCK_STREAM == type_) { - // Cancel pending sockets - if (listen_queue_) { - while (!listen_queue_->empty()) { - SocketAddress addr = listen_queue_->front(); - - // Disconnect listening socket. - server_->Disconnect(server_->LookupBinding(addr)); - listen_queue_->pop_front(); - } - delete listen_queue_; - listen_queue_ = NULL; - } - // Disconnect stream sockets - if (CS_CONNECTED == state_) { - // Disconnect remote socket, check if it is a child of a server socket. - VirtualSocket* socket = - server_->LookupConnection(local_addr_, remote_addr_); - if (!socket) { - // Not a server socket child, then see if it is bound. - // TODO: If this is indeed a server socket that has no - // children this will cause the server socket to be - // closed. This might lead to unexpected results, how to fix this? - socket = server_->LookupBinding(remote_addr_); - } - server_->Disconnect(socket); - - // Remove mapping for both directions. - server_->RemoveConnection(remote_addr_, local_addr_); - server_->RemoveConnection(local_addr_, remote_addr_); - } - // Cancel potential connects - MessageList msgs; - if (server_->msg_queue_) { - server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); - } - for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { - ASSERT(NULL != it->pdata); - MessageAddress* data = static_cast(it->pdata); - - // Lookup remote side. - VirtualSocket* socket = server_->LookupConnection(local_addr_, - data->addr); - if (socket) { - // Server socket, remote side is a socket retreived by - // accept. Accepted sockets are not bound so we will not - // find it by looking in the bindings table. - server_->Disconnect(socket); - server_->RemoveConnection(local_addr_, data->addr); - } else { - server_->Disconnect(server_->LookupBinding(data->addr)); - } - delete data; - } - // Clear incoming packets and disconnect messages - if (server_->msg_queue_) { - server_->msg_queue_->Clear(this); - } - } - - state_ = CS_CLOSED; + local_addr_ = addr; + int result = server_->Bind(this, &local_addr_); + if (result != 0) { local_addr_.Clear(); - remote_addr_.Clear(); - return 0; + error_ = EADDRINUSE; + } else { + bound_ = true; + was_any_ = addr.IsAnyIP(); + } + return result; +} + +int VirtualSocket::Connect(const SocketAddress& addr) { + return InitiateConnect(addr, true); +} + +int VirtualSocket::Close() { + if (!local_addr_.IsNil() && bound_) { + // Remove from the binding table. + server_->Unbind(local_addr_, this); + bound_ = false; } - virtual int Send(const void *pv, size_t cb) { + if (SOCK_STREAM == type_) { + // Cancel pending sockets + if (listen_queue_) { + while (!listen_queue_->empty()) { + SocketAddress addr = listen_queue_->front(); + + // Disconnect listening socket. + server_->Disconnect(server_->LookupBinding(addr)); + listen_queue_->pop_front(); + } + delete listen_queue_; + listen_queue_ = NULL; + } + // Disconnect stream sockets + if (CS_CONNECTED == state_) { + // Disconnect remote socket, check if it is a child of a server socket. + VirtualSocket* socket = + server_->LookupConnection(local_addr_, remote_addr_); + if (!socket) { + // Not a server socket child, then see if it is bound. + // TODO(tbd): If this is indeed a server socket that has no + // children this will cause the server socket to be + // closed. This might lead to unexpected results, how to fix this? + socket = server_->LookupBinding(remote_addr_); + } + server_->Disconnect(socket); + + // Remove mapping for both directions. + server_->RemoveConnection(remote_addr_, local_addr_); + server_->RemoveConnection(local_addr_, remote_addr_); + } + // Cancel potential connects + MessageList msgs; + if (server_->msg_queue_) { + server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); + } + for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { + ASSERT(NULL != it->pdata); + MessageAddress* data = static_cast(it->pdata); + + // Lookup remote side. + VirtualSocket* socket = + server_->LookupConnection(local_addr_, data->addr); + if (socket) { + // Server socket, remote side is a socket retreived by + // accept. Accepted sockets are not bound so we will not + // find it by looking in the bindings table. + server_->Disconnect(socket); + server_->RemoveConnection(local_addr_, data->addr); + } else { + server_->Disconnect(server_->LookupBinding(data->addr)); + } + delete data; + } + // Clear incoming packets and disconnect messages + if (server_->msg_queue_) { + server_->msg_queue_->Clear(this); + } + } + + state_ = CS_CLOSED; + local_addr_.Clear(); + remote_addr_.Clear(); + return 0; +} + +int VirtualSocket::Send(const void* pv, size_t cb) { if (CS_CONNECTED != state_) { error_ = ENOTCONN; return -1; @@ -230,310 +239,255 @@ class VirtualSocket : public AsyncSocket, public MessageHandler { } else { return SendTcp(pv, cb); } - } +} - virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) { - if (SOCK_DGRAM == type_) { - return SendUdp(pv, cb, addr); - } else { - if (CS_CONNECTED != state_) { - error_ = ENOTCONN; - return -1; - } - return SendTcp(pv, cb); - } - } - - virtual int Recv(void *pv, size_t cb) { - SocketAddress addr; - return RecvFrom(pv, cb, &addr); - } - - virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) { - // If we don't have a packet, then either error or wait for one to arrive. - if (recv_buffer_.empty()) { - if (async_) { - error_ = EAGAIN; - return -1; - } - while (recv_buffer_.empty()) { - Message msg; - server_->msg_queue_->Get(&msg); - server_->msg_queue_->Dispatch(&msg); - } - } - - // Return the packet at the front of the queue. - Packet* packet = recv_buffer_.front(); - size_t data_read = _min(cb, packet->size()); - memcpy(pv, packet->data(), data_read); - *paddr = packet->from(); - - if (data_read < packet->size()) { - packet->Consume(data_read); - } else { - recv_buffer_.pop_front(); - delete packet; - } - - if (SOCK_STREAM == type_) { - bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_); - recv_buffer_size_ -= data_read; - if (was_full) { - VirtualSocket* sender = server_->LookupBinding(remote_addr_); - ASSERT(NULL != sender); - server_->SendTcp(sender); - } - } - - return static_cast(data_read); - } - - virtual int Listen(int backlog) { - ASSERT(SOCK_STREAM == type_); - ASSERT(CS_CLOSED == state_); - if (local_addr_.IsNil()) { - error_ = EINVAL; +int VirtualSocket::SendTo(const void* pv, + size_t cb, + const SocketAddress& addr) { + if (SOCK_DGRAM == type_) { + return SendUdp(pv, cb, addr); + } else { + if (CS_CONNECTED != state_) { + error_ = ENOTCONN; return -1; } - ASSERT(NULL == listen_queue_); - listen_queue_ = new ListenQueue; - state_ = CS_CONNECTING; - return 0; + return SendTcp(pv, cb); + } +} + +int VirtualSocket::Recv(void* pv, size_t cb) { + SocketAddress addr; + return RecvFrom(pv, cb, &addr); +} + +int VirtualSocket::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) { + // If we don't have a packet, then either error or wait for one to arrive. + if (recv_buffer_.empty()) { + if (async_) { + error_ = EAGAIN; + return -1; + } + while (recv_buffer_.empty()) { + Message msg; + server_->msg_queue_->Get(&msg); + server_->msg_queue_->Dispatch(&msg); + } } - virtual VirtualSocket* Accept(SocketAddress *paddr) { - if (NULL == listen_queue_) { - error_ = EINVAL; - return NULL; - } - while (!listen_queue_->empty()) { - VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_, - async_); + // Return the packet at the front of the queue. + Packet* packet = recv_buffer_.front(); + size_t data_read = _min(cb, packet->size()); + memcpy(pv, packet->data(), data_read); + *paddr = packet->from(); - // Set the new local address to the same as this server socket. - socket->SetLocalAddress(local_addr_); - // Sockets made from a socket that 'was Any' need to inherit that. - socket->set_was_any(was_any_); - SocketAddress remote_addr(listen_queue_->front()); - int result = socket->InitiateConnect(remote_addr, false); - listen_queue_->pop_front(); - if (result != 0) { - delete socket; - continue; - } - socket->CompleteConnect(remote_addr, false); - if (paddr) { - *paddr = remote_addr; - } - return socket; + if (data_read < packet->size()) { + packet->Consume(data_read); + } else { + recv_buffer_.pop_front(); + delete packet; + } + + if (SOCK_STREAM == type_) { + bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_); + recv_buffer_size_ -= data_read; + if (was_full) { + VirtualSocket* sender = server_->LookupBinding(remote_addr_); + ASSERT(NULL != sender); + server_->SendTcp(sender); } - error_ = EWOULDBLOCK; + } + + return static_cast(data_read); +} + +int VirtualSocket::Listen(int backlog) { + ASSERT(SOCK_STREAM == type_); + ASSERT(CS_CLOSED == state_); + if (local_addr_.IsNil()) { + error_ = EINVAL; + return -1; + } + ASSERT(NULL == listen_queue_); + listen_queue_ = new ListenQueue; + state_ = CS_CONNECTING; + return 0; +} + +VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { + if (NULL == listen_queue_) { + error_ = EINVAL; return NULL; } + while (!listen_queue_->empty()) { + VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_, async_); - virtual int GetError() const { - return error_; - } - - virtual void SetError(int error) { - error_ = error; - } - - virtual ConnState GetState() const { - return state_; - } - - virtual int GetOption(Option opt, int* value) { - OptionsMap::const_iterator it = options_map_.find(opt); - if (it == options_map_.end()) { - return -1; + // Set the new local address to the same as this server socket. + socket->SetLocalAddress(local_addr_); + // Sockets made from a socket that 'was Any' need to inherit that. + socket->set_was_any(was_any_); + SocketAddress remote_addr(listen_queue_->front()); + int result = socket->InitiateConnect(remote_addr, false); + listen_queue_->pop_front(); + if (result != 0) { + delete socket; + continue; } - *value = it->second; - return 0; // 0 is success to emulate getsockopt() + socket->CompleteConnect(remote_addr, false); + if (paddr) { + *paddr = remote_addr; + } + return socket; } + error_ = EWOULDBLOCK; + return NULL; +} - virtual int SetOption(Option opt, int value) { - options_map_[opt] = value; - return 0; // 0 is success to emulate setsockopt() +int VirtualSocket::GetError() const { + return error_; +} + +void VirtualSocket::SetError(int error) { + error_ = error; +} + +Socket::ConnState VirtualSocket::GetState() const { + return state_; +} + +int VirtualSocket::GetOption(Option opt, int* value) { + OptionsMap::const_iterator it = options_map_.find(opt); + if (it == options_map_.end()) { + return -1; } + *value = it->second; + return 0; // 0 is success to emulate getsockopt() +} - virtual int EstimateMTU(uint16* mtu) { - if (CS_CONNECTED != state_) - return ENOTCONN; - else - return 65536; - } +int VirtualSocket::SetOption(Option opt, int value) { + options_map_[opt] = value; + return 0; // 0 is success to emulate setsockopt() +} - void OnMessage(Message *pmsg) { - if (pmsg->message_id == MSG_ID_PACKET) { - //ASSERT(!local_addr_.IsAny()); - ASSERT(NULL != pmsg->pdata); - Packet* packet = static_cast(pmsg->pdata); +int VirtualSocket::EstimateMTU(uint16* mtu) { + if (CS_CONNECTED != state_) + return ENOTCONN; + else + return 65536; +} - recv_buffer_.push_back(packet); +void VirtualSocket::OnMessage(Message* pmsg) { + if (pmsg->message_id == MSG_ID_PACKET) { + // ASSERT(!local_addr_.IsAny()); + ASSERT(NULL != pmsg->pdata); + Packet* packet = static_cast(pmsg->pdata); + recv_buffer_.push_back(packet); + + if (async_) { + SignalReadEvent(this); + } + } else if (pmsg->message_id == MSG_ID_CONNECT) { + ASSERT(NULL != pmsg->pdata); + MessageAddress* data = static_cast(pmsg->pdata); + if (listen_queue_ != NULL) { + listen_queue_->push_back(data->addr); if (async_) { SignalReadEvent(this); } - } else if (pmsg->message_id == MSG_ID_CONNECT) { - ASSERT(NULL != pmsg->pdata); - MessageAddress* data = static_cast(pmsg->pdata); - if (listen_queue_ != NULL) { - listen_queue_->push_back(data->addr); - if (async_) { - SignalReadEvent(this); - } - } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { - CompleteConnect(data->addr, true); - } else { - LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening"; - server_->Disconnect(server_->LookupBinding(data->addr)); - } - delete data; - } else if (pmsg->message_id == MSG_ID_DISCONNECT) { - ASSERT(SOCK_STREAM == type_); - if (CS_CLOSED != state_) { - int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; - state_ = CS_CLOSED; - remote_addr_.Clear(); - if (async_) { - SignalCloseEvent(this, error); - } - } + } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { + CompleteConnect(data->addr, true); } else { - ASSERT(false); + LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening"; + server_->Disconnect(server_->LookupBinding(data->addr)); + } + delete data; + } else if (pmsg->message_id == MSG_ID_DISCONNECT) { + ASSERT(SOCK_STREAM == type_); + if (CS_CLOSED != state_) { + int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; + state_ = CS_CLOSED; + remote_addr_.Clear(); + if (async_) { + SignalCloseEvent(this, error); + } + } + } else { + ASSERT(false); + } +} + +int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { + if (!remote_addr_.IsNil()) { + error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; + return -1; + } + if (local_addr_.IsNil()) { + // If there's no local address set, grab a random one in the correct AF. + int result = 0; + if (addr.ipaddr().family() == AF_INET) { + result = Bind(SocketAddress("0.0.0.0", 0)); + } else if (addr.ipaddr().family() == AF_INET6) { + result = Bind(SocketAddress("::", 0)); + } + if (result != 0) { + return result; } } - - bool was_any() { return was_any_; } - void set_was_any(bool was_any) { was_any_ = was_any; } - - private: - struct NetworkEntry { - size_t size; - uint32 done_time; - }; - - typedef std::deque ListenQueue; - typedef std::deque NetworkQueue; - typedef std::vector SendBuffer; - typedef std::list RecvBuffer; - typedef std::map OptionsMap; - - int InitiateConnect(const SocketAddress& addr, bool use_delay) { - if (!remote_addr_.IsNil()) { - error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; - return -1; - } - if (local_addr_.IsNil()) { - // If there's no local address set, grab a random one in the correct AF. - int result = 0; - if (addr.ipaddr().family() == AF_INET) { - result = Bind(SocketAddress("0.0.0.0", 0)); - } else if (addr.ipaddr().family() == AF_INET6) { - result = Bind(SocketAddress("::", 0)); - } - if (result != 0) { - return result; - } - } - if (type_ == SOCK_DGRAM) { - remote_addr_ = addr; - state_ = CS_CONNECTED; - } else { - int result = server_->Connect(this, addr, use_delay); - if (result != 0) { - error_ = EHOSTUNREACH; - return -1; - } - state_ = CS_CONNECTING; - } - return 0; - } - - void CompleteConnect(const SocketAddress& addr, bool notify) { - ASSERT(CS_CONNECTING == state_); + if (type_ == SOCK_DGRAM) { remote_addr_ = addr; state_ = CS_CONNECTED; - server_->AddConnection(remote_addr_, local_addr_, this); - if (async_ && notify) { - SignalConnectEvent(this); - } - } - - int SendUdp(const void* pv, size_t cb, const SocketAddress& addr) { - // If we have not been assigned a local port, then get one. - if (local_addr_.IsNil()) { - local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); - int result = server_->Bind(this, &local_addr_); - if (result != 0) { - local_addr_.Clear(); - error_ = EADDRINUSE; - return result; - } - } - - // Send the data in a message to the appropriate socket. - return server_->SendUdp(this, static_cast(pv), cb, addr); - } - - int SendTcp(const void* pv, size_t cb) { - size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); - if (0 == capacity) { - write_enabled_ = true; - error_ = EWOULDBLOCK; + } else { + int result = server_->Connect(this, addr, use_delay); + if (result != 0) { + error_ = EHOSTUNREACH; return -1; } - size_t consumed = _min(cb, capacity); - const char* cpv = static_cast(pv); - send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed); - server_->SendTcp(this); - return static_cast(consumed); + state_ = CS_CONNECTING; + } + return 0; +} + +void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { + ASSERT(CS_CONNECTING == state_); + remote_addr_ = addr; + state_ = CS_CONNECTED; + server_->AddConnection(remote_addr_, local_addr_, this); + if (async_ && notify) { + SignalConnectEvent(this); + } +} + +int VirtualSocket::SendUdp(const void* pv, + size_t cb, + const SocketAddress& addr) { + // If we have not been assigned a local port, then get one. + if (local_addr_.IsNil()) { + local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); + int result = server_->Bind(this, &local_addr_); + if (result != 0) { + local_addr_.Clear(); + error_ = EADDRINUSE; + return result; + } } - VirtualSocketServer* server_; - int family_; - int type_; - bool async_; - ConnState state_; - int error_; - SocketAddress local_addr_; - SocketAddress remote_addr_; + // Send the data in a message to the appropriate socket. + return server_->SendUdp(this, static_cast(pv), cb, addr); +} - // Pending sockets which can be Accepted - ListenQueue* listen_queue_; - - // Data which tcp has buffered for sending - SendBuffer send_buffer_; - bool write_enabled_; - - // Critical section to protect the recv_buffer and queue_ - CriticalSection crit_; - - // Network model that enforces bandwidth and capacity constraints - NetworkQueue network_; - size_t network_size_; - - // Data which has been received from the network - RecvBuffer recv_buffer_; - // The amount of data which is in flight or in recv_buffer_ - size_t recv_buffer_size_; - - // Is this socket bound? - bool bound_; - - // When we bind a socket to Any, VSS's Bind gives it another address. For - // dual-stack sockets, we want to distinguish between sockets that were - // explicitly given a particular address and sockets that had one picked - // for them by VSS. - bool was_any_; - - // Store the options that are set - OptionsMap options_map_; - - friend class VirtualSocketServer; -}; +int VirtualSocket::SendTcp(const void* pv, size_t cb) { + size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); + if (0 == capacity) { + write_enabled_ = true; + error_ = EWOULDBLOCK; + return -1; + } + size_t consumed = _min(cb, capacity); + const char* cpv = static_cast(pv); + send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed); + server_->SendTcp(this); + return static_cast(consumed); +} VirtualSocketServer::VirtualSocketServer(SocketServer* ss) : server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false), diff --git a/webrtc/base/virtualsocketserver.h b/webrtc/base/virtualsocketserver.h index 0ea51ab5d..cecce924c 100644 --- a/webrtc/base/virtualsocketserver.h +++ b/webrtc/base/virtualsocketserver.h @@ -21,6 +21,7 @@ namespace rtc { +class Packet; class VirtualSocket; class SocketAddressPair; @@ -232,6 +233,104 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> { DISALLOW_EVIL_CONSTRUCTORS(VirtualSocketServer); }; +// Implements the socket interface using the virtual network. Packets are +// passed as messages using the message queue of the socket server. +class VirtualSocket : public AsyncSocket, public MessageHandler { + public: + VirtualSocket(VirtualSocketServer* server, int family, int type, bool async); + virtual ~VirtualSocket(); + + virtual SocketAddress GetLocalAddress() const; + virtual SocketAddress GetRemoteAddress() const; + + // Used by server sockets to set the local address without binding. + void SetLocalAddress(const SocketAddress& addr); + + virtual int Bind(const SocketAddress& addr); + virtual int Connect(const SocketAddress& addr); + virtual int Close(); + virtual int Send(const void* pv, size_t cb); + virtual int SendTo(const void* pv, size_t cb, const SocketAddress& addr); + virtual int Recv(void* pv, size_t cb); + virtual int RecvFrom(void* pv, size_t cb, SocketAddress* paddr); + virtual int Listen(int backlog); + virtual VirtualSocket* Accept(SocketAddress* paddr); + + virtual int GetError() const; + virtual void SetError(int error); + virtual ConnState GetState() const; + virtual int GetOption(Option opt, int* value); + virtual int SetOption(Option opt, int value); + virtual int EstimateMTU(uint16* mtu); + void OnMessage(Message* pmsg); + + bool was_any() { return was_any_; } + void set_was_any(bool was_any) { was_any_ = was_any; } + + // For testing purpose only. Fired when client socket is bound to an address. + sigslot::signal2 SignalAddressReady; + + private: + struct NetworkEntry { + size_t size; + uint32 done_time; + }; + + typedef std::deque ListenQueue; + typedef std::deque NetworkQueue; + typedef std::vector SendBuffer; + typedef std::list RecvBuffer; + typedef std::map OptionsMap; + + int InitiateConnect(const SocketAddress& addr, bool use_delay); + void CompleteConnect(const SocketAddress& addr, bool notify); + int SendUdp(const void* pv, size_t cb, const SocketAddress& addr); + int SendTcp(const void* pv, size_t cb); + + VirtualSocketServer* server_; + int family_; + int type_; + bool async_; + ConnState state_; + int error_; + SocketAddress local_addr_; + SocketAddress alternative_local_addr_; + SocketAddress remote_addr_; + + // Pending sockets which can be Accepted + ListenQueue* listen_queue_; + + // Data which tcp has buffered for sending + SendBuffer send_buffer_; + bool write_enabled_; + + // Critical section to protect the recv_buffer and queue_ + CriticalSection crit_; + + // Network model that enforces bandwidth and capacity constraints + NetworkQueue network_; + size_t network_size_; + + // Data which has been received from the network + RecvBuffer recv_buffer_; + // The amount of data which is in flight or in recv_buffer_ + size_t recv_buffer_size_; + + // Is this socket bound? + bool bound_; + + // When we bind a socket to Any, VSS's Bind gives it another address. For + // dual-stack sockets, we want to distinguish between sockets that were + // explicitly given a particular address and sockets that had one picked + // for them by VSS. + bool was_any_; + + // Store the options that are set + OptionsMap options_map_; + + friend class VirtualSocketServer; +}; + } // namespace rtc #endif // WEBRTC_BASE_VIRTUALSOCKETSERVER_H_