webrtc/talk/base/macasyncsocket.cc

473 lines
13 KiB
C++
Raw Normal View History

// Copyright 2010 Google Inc. All Rights Reserved.
// thaloun@google.com (Tim Haloun)
//
// MacAsyncSocket is a kind of AsyncSocket. It does not support the SOCK_DGRAM
// type (yet). It works asynchronously, which means that users of this socket
// should connect to the various events declared in asyncsocket.h to receive
// notifications about this socket. It uses CFSockets for signals, but prefers
// the basic bsd socket operations rather than their CFSocket wrappers when
// possible.
#include <CoreFoundation/CoreFoundation.h>
#include <fcntl.h>
#include "talk/base/macasyncsocket.h"
#include "talk/base/logging.h"
#include "talk/base/macsocketserver.h"
namespace talk_base {
static const int kCallbackFlags = kCFSocketReadCallBack |
kCFSocketConnectCallBack |
kCFSocketWriteCallBack;
MacAsyncSocket::MacAsyncSocket(MacBaseSocketServer* ss, int family)
: ss_(ss),
socket_(NULL),
native_socket_(INVALID_SOCKET),
source_(NULL),
current_callbacks_(0),
disabled_(false),
error_(0),
state_(CS_CLOSED),
resolver_(NULL) {
Initialize(family);
}
MacAsyncSocket::~MacAsyncSocket() {
Close();
}
// Returns the address to which the socket is bound. If the socket is not
// bound, then the any-address is returned.
SocketAddress MacAsyncSocket::GetLocalAddress() const {
SocketAddress address;
// The CFSocket doesn't pick up on implicit binds from the connect call.
// Calling bind in before connect explicitly causes errors, so just query
// the underlying bsd socket.
sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
int result = ::getsockname(native_socket_,
reinterpret_cast<sockaddr*>(&addr), &addrlen);
if (result >= 0) {
SocketAddressFromSockAddrStorage(addr, &address);
}
return address;
}
// Returns the address to which the socket is connected. If the socket is not
// connected, then the any-address is returned.
SocketAddress MacAsyncSocket::GetRemoteAddress() const {
SocketAddress address;
// Use native_socket for consistency with GetLocalAddress.
sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
int result = ::getpeername(native_socket_,
reinterpret_cast<sockaddr*>(&addr), &addrlen);
if (result >= 0) {
SocketAddressFromSockAddrStorage(addr, &address);
}
return address;
}
// Bind the socket to a local address.
int MacAsyncSocket::Bind(const SocketAddress& address) {
sockaddr_storage saddr = {0};
size_t len = address.ToSockAddrStorage(&saddr);
int err = ::bind(native_socket_, reinterpret_cast<sockaddr*>(&saddr), len);
if (err == SOCKET_ERROR) error_ = errno;
return err;
}
void MacAsyncSocket::OnResolveResult(SignalThread* thread) {
if (thread != resolver_) {
return;
}
int error = resolver_->error();
if (error == 0) {
error = DoConnect(resolver_->address());
} else {
Close();
}
if (error) {
error_ = error;
SignalCloseEvent(this, error_);
}
}
// Connect to a remote address.
int MacAsyncSocket::Connect(const SocketAddress& addr) {
// TODO(djw): Consolidate all the connect->resolve->doconnect implementations.
if (state_ != CS_CLOSED) {
SetError(EALREADY);
return SOCKET_ERROR;
}
if (addr.IsUnresolved()) {
LOG(LS_VERBOSE) << "Resolving addr in MacAsyncSocket::Connect";
resolver_ = new AsyncResolver();
resolver_->set_address(addr);
resolver_->SignalWorkDone.connect(this,
&MacAsyncSocket::OnResolveResult);
resolver_->Start();
state_ = CS_CONNECTING;
return 0;
}
return DoConnect(addr);
}
int MacAsyncSocket::DoConnect(const SocketAddress& addr) {
if (!valid()) {
Initialize(addr.family());
if (!valid())
return SOCKET_ERROR;
}
sockaddr_storage saddr;
size_t len = addr.ToSockAddrStorage(&saddr);
int result = ::connect(native_socket_, reinterpret_cast<sockaddr*>(&saddr),
len);
if (result != SOCKET_ERROR) {
state_ = CS_CONNECTED;
} else {
error_ = errno;
if (error_ == EINPROGRESS) {
state_ = CS_CONNECTING;
result = 0;
}
}
return result;
}
// Send to the remote end we're connected to.
int MacAsyncSocket::Send(const void* buffer, size_t length) {
if (!valid()) {
return SOCKET_ERROR;
}
int sent = ::send(native_socket_, buffer, length, 0);
if (sent == SOCKET_ERROR) {
error_ = errno;
if (IsBlocking()) {
// Reenable the writable callback (once), since we are flow controlled.
CFSocketEnableCallBacks(socket_, kCallbackFlags);
current_callbacks_ = kCallbackFlags;
}
}
return sent;
}
// Send to the given address. We may or may not be connected to anyone.
int MacAsyncSocket::SendTo(const void* buffer, size_t length,
const SocketAddress& address) {
if (!valid()) {
return SOCKET_ERROR;
}
sockaddr_storage saddr;
size_t len = address.ToSockAddrStorage(&saddr);
int sent = ::sendto(native_socket_, buffer, length, 0,
reinterpret_cast<sockaddr*>(&saddr), len);
if (sent == SOCKET_ERROR) {
error_ = errno;
}
return sent;
}
// Read data received from the remote end we're connected to.
int MacAsyncSocket::Recv(void* buffer, size_t length) {
int received = ::recv(native_socket_, reinterpret_cast<char*>(buffer),
length, 0);
if (received == SOCKET_ERROR) error_ = errno;
// Recv should only be called when there is data to read
ASSERT((received != 0) || (length == 0));
return received;
}
// Read data received from any remote party
int MacAsyncSocket::RecvFrom(void* buffer, size_t length,
SocketAddress* out_addr) {
sockaddr_storage saddr;
socklen_t addr_len = sizeof(saddr);
int received = ::recvfrom(native_socket_, reinterpret_cast<char*>(buffer),
length, 0, reinterpret_cast<sockaddr*>(&saddr),
&addr_len);
if (received >= 0 && out_addr != NULL) {
SocketAddressFromSockAddrStorage(saddr, out_addr);
} else if (received == SOCKET_ERROR) {
error_ = errno;
}
return received;
}
int MacAsyncSocket::Listen(int backlog) {
if (!valid()) {
return SOCKET_ERROR;
}
int res = ::listen(native_socket_, backlog);
if (res != SOCKET_ERROR)
state_ = CS_CONNECTING;
else
error_ = errno;
return res;
}
MacAsyncSocket* MacAsyncSocket::Accept(SocketAddress* out_addr) {
sockaddr_storage saddr;
socklen_t addr_len = sizeof(saddr);
int socket_fd = ::accept(native_socket_, reinterpret_cast<sockaddr*>(&saddr),
&addr_len);
if (socket_fd == INVALID_SOCKET) {
error_ = errno;
return NULL;
}
MacAsyncSocket* s = new MacAsyncSocket(ss_, saddr.ss_family, socket_fd);
if (s && s->valid()) {
s->state_ = CS_CONNECTED;
if (out_addr)
SocketAddressFromSockAddrStorage(saddr, out_addr);
} else {
delete s;
s = NULL;
}
return s;
}
int MacAsyncSocket::Close() {
if (source_ != NULL) {
CFRunLoopSourceInvalidate(source_);
CFRelease(source_);
if (ss_) ss_->UnregisterSocket(this);
source_ = NULL;
}
if (socket_ != NULL) {
CFSocketInvalidate(socket_);
CFRelease(socket_);
socket_ = NULL;
}
if (resolver_) {
resolver_->Destroy(false);
resolver_ = NULL;
}
native_socket_ = INVALID_SOCKET; // invalidates the socket
error_ = 0;
state_ = CS_CLOSED;
return 0;
}
int MacAsyncSocket::EstimateMTU(uint16* mtu) {
ASSERT(false && "NYI");
return -1;
}
int MacAsyncSocket::GetError() const {
return error_;
}
void MacAsyncSocket::SetError(int error) {
error_ = error;
}
Socket::ConnState MacAsyncSocket::GetState() const {
return state_;
}
int MacAsyncSocket::GetOption(Option opt, int* value) {
ASSERT(false && "NYI");
return -1;
}
int MacAsyncSocket::SetOption(Option opt, int value) {
ASSERT(false && "NYI");
return -1;
}
void MacAsyncSocket::EnableCallbacks() {
if (valid()) {
disabled_ = false;
CFSocketEnableCallBacks(socket_, current_callbacks_);
}
}
void MacAsyncSocket::DisableCallbacks() {
if (valid()) {
disabled_ = true;
CFSocketDisableCallBacks(socket_, kCallbackFlags);
}
}
MacAsyncSocket::MacAsyncSocket(MacBaseSocketServer* ss, int family,
int native_socket)
: ss_(ss),
socket_(NULL),
native_socket_(native_socket),
source_(NULL),
current_callbacks_(0),
disabled_(false),
error_(0),
state_(CS_CLOSED),
resolver_(NULL) {
Initialize(family);
}
// Create a new socket, wrapping the native socket if provided or creating one
// otherwise. In case of any failure, consume the native socket. We assume the
// wrapped socket is in the closed state. If this is not the case you must
// update the state_ field for this socket yourself.
void MacAsyncSocket::Initialize(int family) {
CFSocketContext ctx = { 0 };
ctx.info = this;
// First create the CFSocket
CFSocketRef cf_socket = NULL;
bool res = false;
if (native_socket_ == INVALID_SOCKET) {
cf_socket = CFSocketCreate(kCFAllocatorDefault,
family, SOCK_STREAM, IPPROTO_TCP,
kCallbackFlags, MacAsyncSocketCallBack, &ctx);
} else {
cf_socket = CFSocketCreateWithNative(kCFAllocatorDefault,
native_socket_, kCallbackFlags,
MacAsyncSocketCallBack, &ctx);
}
if (cf_socket) {
res = true;
socket_ = cf_socket;
native_socket_ = CFSocketGetNative(cf_socket);
current_callbacks_ = kCallbackFlags;
}
if (res) {
// Make the underlying socket asynchronous
res = (-1 != ::fcntl(native_socket_, F_SETFL,
::fcntl(native_socket_, F_GETFL, 0) | O_NONBLOCK));
}
if (res) {
// Add this socket to the run loop, at priority 1 so that it will be
// queued behind any pending signals.
source_ = CFSocketCreateRunLoopSource(kCFAllocatorDefault, socket_, 1);
res = (source_ != NULL);
if (!res) errno = EINVAL;
}
if (res) {
if (ss_) ss_->RegisterSocket(this);
CFRunLoopAddSource(CFRunLoopGetCurrent(), source_, kCFRunLoopCommonModes);
}
if (!res) {
int error = errno;
Close(); // Clears error_.
error_ = error;
}
}
// Call CFRelease on the result when done using it
CFDataRef MacAsyncSocket::CopyCFAddress(const SocketAddress& address) {
sockaddr_storage saddr;
size_t len = address.ToSockAddrStorage(&saddr);
const UInt8* bytes = reinterpret_cast<UInt8*>(&saddr);
CFDataRef cf_address = CFDataCreate(kCFAllocatorDefault,
bytes, len);
ASSERT(cf_address != NULL);
return cf_address;
}
void MacAsyncSocket::MacAsyncSocketCallBack(CFSocketRef s,
CFSocketCallBackType callbackType,
CFDataRef address,
const void* data,
void* info) {
MacAsyncSocket* this_socket =
reinterpret_cast<MacAsyncSocket*>(info);
ASSERT(this_socket != NULL && this_socket->socket_ == s);
// Don't signal any socket messages if the socketserver is not listening on
// them. When we are reenabled they will be requeued and will fire again.
if (this_socket->disabled_)
return;
switch (callbackType) {
case kCFSocketReadCallBack:
// This callback is invoked in one of 3 situations:
// 1. A new connection is waiting to be accepted.
// 2. The remote end closed the connection (a recv will return 0).
// 3. Data is available to read.
// 4. The connection closed unhappily (recv will return -1).
if (this_socket->state_ == CS_CONNECTING) {
// Case 1.
this_socket->SignalReadEvent(this_socket);
} else {
char ch, amt;
amt = ::recv(this_socket->native_socket_, &ch, 1, MSG_PEEK);
if (amt == 0) {
// Case 2.
this_socket->state_ = CS_CLOSED;
// Disable additional callbacks or we will signal close twice.
CFSocketDisableCallBacks(this_socket->socket_, kCFSocketReadCallBack);
this_socket->current_callbacks_ &= ~kCFSocketReadCallBack;
this_socket->SignalCloseEvent(this_socket, 0);
} else if (amt > 0) {
// Case 3.
this_socket->SignalReadEvent(this_socket);
} else {
// Case 4.
int error = errno;
if (error == EAGAIN) {
// Observed in practice. Let's hope it's a spurious or out of date
// signal, since we just eat it.
} else {
this_socket->error_ = error;
this_socket->SignalCloseEvent(this_socket, error);
}
}
}
break;
case kCFSocketConnectCallBack:
if (data != NULL) {
// An error occured in the background while connecting
this_socket->error_ = errno;
this_socket->state_ = CS_CLOSED;
this_socket->SignalCloseEvent(this_socket, this_socket->error_);
} else {
this_socket->state_ = CS_CONNECTED;
this_socket->SignalConnectEvent(this_socket);
}
break;
case kCFSocketWriteCallBack:
// Update our callback tracking. Write doesn't reenable, so it's off now.
this_socket->current_callbacks_ &= ~kCFSocketWriteCallBack;
this_socket->SignalWriteEvent(this_socket);
break;
default:
ASSERT(false && "Invalid callback type for socket");
}
}
} // namespace talk_base