28e2075280
trunk/talk git-svn-id: http://webrtc.googlecode.com/svn/trunk@4318 4adac7df-926f-26a2-2b94-8c16560cd09d
298 lines
9.7 KiB
C++
298 lines
9.7 KiB
C++
/*
|
|
* libjingle
|
|
* Copyright 2004--2005, Google Inc.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* 1. Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
* this list of conditions and the following disclaimer in the documentation
|
|
* and/or other materials provided with the distribution.
|
|
* 3. The name of the author may not be used to endorse or promote products
|
|
* derived from this software without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
|
|
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
|
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
|
|
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
|
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
|
|
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
|
|
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
|
|
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
|
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
#include <iomanip>
|
|
|
|
#include "talk/base/asyncsocket.h"
|
|
#include "talk/base/logging.h"
|
|
#include "talk/base/socketfactory.h"
|
|
#include "talk/base/socketpool.h"
|
|
#include "talk/base/socketstream.h"
|
|
#include "talk/base/thread.h"
|
|
|
|
namespace talk_base {
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
// StreamCache - Caches a set of open streams, defers creation to a separate
|
|
// StreamPool.
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
StreamCache::StreamCache(StreamPool* pool) : pool_(pool) {
|
|
}
|
|
|
|
StreamCache::~StreamCache() {
|
|
for (ConnectedList::iterator it = active_.begin(); it != active_.end();
|
|
++it) {
|
|
delete it->second;
|
|
}
|
|
for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
|
|
++it) {
|
|
delete it->second;
|
|
}
|
|
}
|
|
|
|
StreamInterface* StreamCache::RequestConnectedStream(
|
|
const SocketAddress& remote, int* err) {
|
|
LOG_F(LS_VERBOSE) << "(" << remote << ")";
|
|
for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
|
|
++it) {
|
|
if (remote == it->first) {
|
|
it->second->SignalEvent.disconnect(this);
|
|
// Move from cached_ to active_
|
|
active_.push_front(*it);
|
|
cached_.erase(it);
|
|
if (err)
|
|
*err = 0;
|
|
LOG_F(LS_VERBOSE) << "Providing cached stream";
|
|
return active_.front().second;
|
|
}
|
|
}
|
|
if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
|
|
// We track active streams so that we can remember their address
|
|
active_.push_front(ConnectedStream(remote, stream));
|
|
LOG_F(LS_VERBOSE) << "Providing new stream";
|
|
return active_.front().second;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
void StreamCache::ReturnConnectedStream(StreamInterface* stream) {
|
|
for (ConnectedList::iterator it = active_.begin(); it != active_.end();
|
|
++it) {
|
|
if (stream == it->second) {
|
|
LOG_F(LS_VERBOSE) << "(" << it->first << ")";
|
|
if (stream->GetState() == SS_CLOSED) {
|
|
// Return closed streams
|
|
LOG_F(LS_VERBOSE) << "Returning closed stream";
|
|
pool_->ReturnConnectedStream(it->second);
|
|
} else {
|
|
// Monitor open streams
|
|
stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent);
|
|
LOG_F(LS_VERBOSE) << "Caching stream";
|
|
cached_.push_front(*it);
|
|
}
|
|
active_.erase(it);
|
|
return;
|
|
}
|
|
}
|
|
ASSERT(false);
|
|
}
|
|
|
|
void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) {
|
|
if ((events & SE_CLOSE) == 0) {
|
|
LOG_F(LS_WARNING) << "(" << events << ", " << err
|
|
<< ") received non-close event";
|
|
return;
|
|
}
|
|
for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
|
|
++it) {
|
|
if (stream == it->second) {
|
|
LOG_F(LS_VERBOSE) << "(" << it->first << ")";
|
|
// We don't cache closed streams, so return it.
|
|
it->second->SignalEvent.disconnect(this);
|
|
LOG_F(LS_VERBOSE) << "Returning closed stream";
|
|
pool_->ReturnConnectedStream(it->second);
|
|
cached_.erase(it);
|
|
return;
|
|
}
|
|
}
|
|
ASSERT(false);
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////
|
|
// NewSocketPool
|
|
//////////////////////////////////////////////////////////////////////
|
|
|
|
NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) {
|
|
}
|
|
|
|
NewSocketPool::~NewSocketPool() {
|
|
}
|
|
|
|
StreamInterface*
|
|
NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
|
|
AsyncSocket* socket =
|
|
factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM);
|
|
if (!socket) {
|
|
if (err)
|
|
*err = -1;
|
|
return NULL;
|
|
}
|
|
if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) {
|
|
if (err)
|
|
*err = socket->GetError();
|
|
delete socket;
|
|
return NULL;
|
|
}
|
|
if (err)
|
|
*err = 0;
|
|
return new SocketStream(socket);
|
|
}
|
|
|
|
void
|
|
NewSocketPool::ReturnConnectedStream(StreamInterface* stream) {
|
|
Thread::Current()->Dispose(stream);
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////
|
|
// ReuseSocketPool
|
|
//////////////////////////////////////////////////////////////////////
|
|
|
|
ReuseSocketPool::ReuseSocketPool(SocketFactory* factory)
|
|
: factory_(factory), stream_(NULL), checked_out_(false) {
|
|
}
|
|
|
|
ReuseSocketPool::~ReuseSocketPool() {
|
|
ASSERT(!checked_out_);
|
|
delete stream_;
|
|
}
|
|
|
|
StreamInterface*
|
|
ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
|
|
// Only one socket can be used from this "pool" at a time
|
|
ASSERT(!checked_out_);
|
|
if (!stream_) {
|
|
LOG_F(LS_VERBOSE) << "Creating new socket";
|
|
int family = remote.family();
|
|
// TODO: Deal with this when we/I clean up DNS resolution.
|
|
if (remote.IsUnresolvedIP()) {
|
|
family = AF_INET;
|
|
}
|
|
AsyncSocket* socket =
|
|
factory_->CreateAsyncSocket(family, SOCK_STREAM);
|
|
if (!socket) {
|
|
if (err)
|
|
*err = -1;
|
|
return NULL;
|
|
}
|
|
stream_ = new SocketStream(socket);
|
|
}
|
|
if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) {
|
|
LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_;
|
|
} else {
|
|
remote_ = remote;
|
|
stream_->Close();
|
|
if ((stream_->GetSocket()->Connect(remote_) != 0)
|
|
&& !stream_->GetSocket()->IsBlocking()) {
|
|
if (err)
|
|
*err = stream_->GetSocket()->GetError();
|
|
return NULL;
|
|
} else {
|
|
LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_;
|
|
}
|
|
}
|
|
stream_->SignalEvent.disconnect(this);
|
|
checked_out_ = true;
|
|
if (err)
|
|
*err = 0;
|
|
return stream_;
|
|
}
|
|
|
|
void
|
|
ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) {
|
|
ASSERT(stream == stream_);
|
|
ASSERT(checked_out_);
|
|
checked_out_ = false;
|
|
// Until the socket is reused, monitor it to determine if it closes.
|
|
stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent);
|
|
}
|
|
|
|
void
|
|
ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) {
|
|
ASSERT(stream == stream_);
|
|
ASSERT(!checked_out_);
|
|
|
|
// If the stream was written to and then immediately returned to us then
|
|
// we may get a writable notification for it, which we should ignore.
|
|
if (events == SE_WRITE) {
|
|
LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring";
|
|
return;
|
|
}
|
|
|
|
// If the peer sent data, we can't process it, so drop the connection.
|
|
// If the socket has closed, clean it up.
|
|
// In either case, we'll reconnect it the next time it is used.
|
|
ASSERT(0 != (events & (SE_READ|SE_CLOSE)));
|
|
if (0 != (events & SE_CLOSE)) {
|
|
LOG_F(LS_VERBOSE) << "Connection closed with error: " << err;
|
|
} else {
|
|
LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing";
|
|
}
|
|
stream_->Close();
|
|
}
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
// LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached
|
|
// LoggingAdapters.
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
LoggingPoolAdapter::LoggingPoolAdapter(
|
|
StreamPool* pool, LoggingSeverity level, const std::string& label,
|
|
bool binary_mode)
|
|
: pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) {
|
|
}
|
|
|
|
LoggingPoolAdapter::~LoggingPoolAdapter() {
|
|
for (StreamList::iterator it = recycle_bin_.begin();
|
|
it != recycle_bin_.end(); ++it) {
|
|
delete *it;
|
|
}
|
|
}
|
|
|
|
StreamInterface* LoggingPoolAdapter::RequestConnectedStream(
|
|
const SocketAddress& remote, int* err) {
|
|
if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
|
|
ASSERT(SS_CLOSED != stream->GetState());
|
|
std::stringstream ss;
|
|
ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8)
|
|
<< stream << ")";
|
|
LOG_V(level_) << ss.str()
|
|
<< ((SS_OPEN == stream->GetState()) ? " Connected"
|
|
: " Connecting")
|
|
<< " to " << remote;
|
|
if (recycle_bin_.empty()) {
|
|
return new LoggingAdapter(stream, level_, ss.str(), binary_mode_);
|
|
}
|
|
LoggingAdapter* logging = recycle_bin_.front();
|
|
recycle_bin_.pop_front();
|
|
logging->set_label(ss.str());
|
|
logging->Attach(stream);
|
|
return logging;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) {
|
|
LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream);
|
|
pool_->ReturnConnectedStream(logging->Detach());
|
|
recycle_bin_.push_back(logging);
|
|
}
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
} // namespace talk_base
|