/* * libjingle * Copyright 2004--2006, Google Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include "talk/base/basictypes.h" #include "talk/base/common.h" #include "talk/base/logging.h" #include "talk/base/scoped_ptr.h" #include "talk/base/stringutils.h" #include "talk/p2p/base/candidate.h" #include "talk/p2p/base/transportchannel.h" #include "pseudotcpchannel.h" using namespace talk_base; namespace cricket { extern const talk_base::ConstantLabel SESSION_STATES[]; // MSG_WK_* - worker thread messages // MSG_ST_* - stream thread messages // MSG_SI_* - signal thread messages enum { MSG_WK_CLOCK = 1, MSG_WK_PURGE, MSG_ST_EVENT, MSG_SI_DESTROYCHANNEL, MSG_SI_DESTROY, }; struct EventData : public MessageData { int event, error; EventData(int ev, int err = 0) : event(ev), error(err) { } }; /////////////////////////////////////////////////////////////////////////////// // PseudoTcpChannel::InternalStream /////////////////////////////////////////////////////////////////////////////// class PseudoTcpChannel::InternalStream : public StreamInterface { public: InternalStream(PseudoTcpChannel* parent); virtual ~InternalStream(); virtual StreamState GetState() const; virtual StreamResult Read(void* buffer, size_t buffer_len, size_t* read, int* error); virtual StreamResult Write(const void* data, size_t data_len, size_t* written, int* error); virtual void Close(); private: // parent_ is accessed and modified exclusively on the event thread, to // avoid thread contention. This means that the PseudoTcpChannel cannot go // away until after it receives a Close() from TunnelStream. PseudoTcpChannel* parent_; }; /////////////////////////////////////////////////////////////////////////////// // PseudoTcpChannel // Member object lifetime summaries: // session_ - passed in constructor, cleared when channel_ goes away. // channel_ - created in Connect, destroyed when session_ or tcp_ goes away. // tcp_ - created in Connect, destroyed when channel_ goes away, or connection // closes. // worker_thread_ - created when channel_ is created, purged when channel_ is // destroyed. // stream_ - created in GetStream, destroyed by owner at arbitrary time. // this - created in constructor, destroyed when worker_thread_ and stream_ // are both gone. /////////////////////////////////////////////////////////////////////////////// // // Signal thread methods // PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session) : signal_thread_(session->session_manager()->signaling_thread()), worker_thread_(NULL), stream_thread_(stream_thread), session_(session), channel_(NULL), tcp_(NULL), stream_(NULL), stream_readable_(false), pending_read_event_(false), ready_to_connect_(false) { ASSERT(signal_thread_->IsCurrent()); ASSERT(NULL != session_); } PseudoTcpChannel::~PseudoTcpChannel() { ASSERT(signal_thread_->IsCurrent()); ASSERT(worker_thread_ == NULL); ASSERT(session_ == NULL); ASSERT(channel_ == NULL); ASSERT(stream_ == NULL); ASSERT(tcp_ == NULL); } bool PseudoTcpChannel::Connect(const std::string& content_name, const std::string& channel_name, int component) { ASSERT(signal_thread_->IsCurrent()); CritScope lock(&cs_); if (channel_) return false; ASSERT(session_ != NULL); worker_thread_ = session_->session_manager()->worker_thread(); content_name_ = content_name; channel_ = session_->CreateChannel( content_name, channel_name, component); channel_name_ = channel_name; channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1); channel_->SignalDestroyed.connect(this, &PseudoTcpChannel::OnChannelDestroyed); channel_->SignalWritableState.connect(this, &PseudoTcpChannel::OnChannelWritableState); channel_->SignalReadPacket.connect(this, &PseudoTcpChannel::OnChannelRead); channel_->SignalRouteChange.connect(this, &PseudoTcpChannel::OnChannelConnectionChanged); ASSERT(tcp_ == NULL); tcp_ = new PseudoTcp(this, 0); if (session_->initiator()) { // Since we may try several protocols and network adapters that won't work, // waiting until we get our first writable notification before initiating // TCP negotiation. ready_to_connect_ = true; } return true; } StreamInterface* PseudoTcpChannel::GetStream() { ASSERT(signal_thread_->IsCurrent()); CritScope lock(&cs_); ASSERT(NULL != session_); if (!stream_) stream_ = new PseudoTcpChannel::InternalStream(this); //TODO("should we disallow creation of new stream at some point?"); return stream_; } void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) { LOG_F(LS_INFO) << "(" << channel->component() << ")"; ASSERT(signal_thread_->IsCurrent()); CritScope lock(&cs_); ASSERT(channel == channel_); signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL); // When MSG_WK_PURGE is received, we know there will be no more messages from // the worker thread. worker_thread_->Clear(this, MSG_WK_CLOCK); worker_thread_->Post(this, MSG_WK_PURGE); session_ = NULL; channel_ = NULL; if ((stream_ != NULL) && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED))) stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0)); if (tcp_) { tcp_->Close(true); AdjustClock(); } SignalChannelClosed(this); } void PseudoTcpChannel::OnSessionTerminate(Session* session) { // When the session terminates before we even connected CritScope lock(&cs_); if (session_ != NULL && channel_ == NULL) { ASSERT(session == session_); ASSERT(worker_thread_ == NULL); ASSERT(tcp_ == NULL); LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel"; session_ = NULL; if (stream_ != NULL) stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1)); } // Even though session_ is being destroyed, we mustn't clear the pointer, // since we'll need it to tear down channel_. // // TODO: Is it always the case that if channel_ != NULL then we'll get // a channel-destroyed notification? } void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) { ASSERT(signal_thread_->IsCurrent()); CritScope lock(&cs_); ASSERT(tcp_ != NULL); tcp_->GetOption(opt, value); } void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) { ASSERT(signal_thread_->IsCurrent()); CritScope lock(&cs_); ASSERT(tcp_ != NULL); tcp_->SetOption(opt, value); } // // Stream thread methods // StreamState PseudoTcpChannel::GetState() const { ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); CritScope lock(&cs_); if (!session_) return SS_CLOSED; if (!tcp_) return SS_OPENING; switch (tcp_->State()) { case PseudoTcp::TCP_LISTEN: case PseudoTcp::TCP_SYN_SENT: case PseudoTcp::TCP_SYN_RECEIVED: return SS_OPENING; case PseudoTcp::TCP_ESTABLISHED: return SS_OPEN; case PseudoTcp::TCP_CLOSED: default: return SS_CLOSED; } } StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len, size_t* read, int* error) { ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); CritScope lock(&cs_); if (!tcp_) return SR_BLOCK; stream_readable_ = false; int result = tcp_->Recv(static_cast(buffer), buffer_len); //LOG_F(LS_VERBOSE) << "Recv returned: " << result; if (result > 0) { if (read) *read = result; // PseudoTcp doesn't currently support repeated Readable signals. Simulate // them here. stream_readable_ = true; if (!pending_read_event_) { pending_read_event_ = true; stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true); } return SR_SUCCESS; } else if (IsBlockingError(tcp_->GetError())) { return SR_BLOCK; } else { if (error) *error = tcp_->GetError(); return SR_ERROR; } // This spot is never reached. } StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len, size_t* written, int* error) { ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); CritScope lock(&cs_); if (!tcp_) return SR_BLOCK; int result = tcp_->Send(static_cast(data), data_len); //LOG_F(LS_VERBOSE) << "Send returned: " << result; if (result > 0) { if (written) *written = result; return SR_SUCCESS; } else if (IsBlockingError(tcp_->GetError())) { return SR_BLOCK; } else { if (error) *error = tcp_->GetError(); return SR_ERROR; } // This spot is never reached. } void PseudoTcpChannel::Close() { ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); CritScope lock(&cs_); stream_ = NULL; // Clear out any pending event notifications stream_thread_->Clear(this, MSG_ST_EVENT); if (tcp_) { tcp_->Close(false); AdjustClock(); } else { CheckDestroy(); } } // // Worker thread methods // void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) { LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; ASSERT(worker_thread_->IsCurrent()); CritScope lock(&cs_); if (!channel_) { LOG_F(LS_WARNING) << "NULL channel"; return; } ASSERT(channel == channel_); if (!tcp_) { LOG_F(LS_WARNING) << "NULL tcp"; return; } if (!ready_to_connect_ || !channel->writable()) return; ready_to_connect_ = false; tcp_->Connect(); AdjustClock(); } void PseudoTcpChannel::OnChannelRead(TransportChannel* channel, const char* data, size_t size, int flags) { //LOG_F(LS_VERBOSE) << "(" << size << ")"; ASSERT(worker_thread_->IsCurrent()); CritScope lock(&cs_); if (!channel_) { LOG_F(LS_WARNING) << "NULL channel"; return; } ASSERT(channel == channel_); if (!tcp_) { LOG_F(LS_WARNING) << "NULL tcp"; return; } tcp_->NotifyPacket(data, size); AdjustClock(); } void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel, const Candidate& candidate) { LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; ASSERT(worker_thread_->IsCurrent()); CritScope lock(&cs_); if (!channel_) { LOG_F(LS_WARNING) << "NULL channel"; return; } ASSERT(channel == channel_); if (!tcp_) { LOG_F(LS_WARNING) << "NULL tcp"; return; } uint16 mtu = 1280; // safe default int family = candidate.address().family(); Socket* socket = worker_thread_->socketserver()->CreateAsyncSocket(family, SOCK_DGRAM); talk_base::scoped_ptr mtu_socket(socket); if (socket == NULL) { LOG_F(LS_WARNING) << "Couldn't create socket while estimating MTU."; } else { if (mtu_socket->Connect(candidate.address()) < 0 || mtu_socket->EstimateMTU(&mtu) < 0) { LOG_F(LS_WARNING) << "Failed to estimate MTU, error=" << mtu_socket->GetError(); } } LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes"; tcp_->NotifyMTU(mtu); AdjustClock(); } void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) { LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(worker_thread_->IsCurrent()); ASSERT(tcp == tcp_); if (stream_) { stream_readable_ = true; pending_read_event_ = true; stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_OPEN | SE_READ | SE_WRITE)); } } void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) { //LOG_F(LS_VERBOSE); ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(worker_thread_->IsCurrent()); ASSERT(tcp == tcp_); if (stream_) { stream_readable_ = true; if (!pending_read_event_) { pending_read_event_ = true; stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ)); } } } void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) { //LOG_F(LS_VERBOSE); ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(worker_thread_->IsCurrent()); ASSERT(tcp == tcp_); if (stream_) stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE)); } void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) { LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(worker_thread_->IsCurrent()); ASSERT(tcp == tcp_); if (stream_) stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError)); } // // Multi-thread methods // void PseudoTcpChannel::OnMessage(Message* pmsg) { if (pmsg->message_id == MSG_WK_CLOCK) { ASSERT(worker_thread_->IsCurrent()); //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)"; CritScope lock(&cs_); if (tcp_) { tcp_->NotifyClock(PseudoTcp::Now()); AdjustClock(false); } } else if (pmsg->message_id == MSG_WK_PURGE) { ASSERT(worker_thread_->IsCurrent()); LOG_F(LS_INFO) << "(MSG_WK_PURGE)"; // At this point, we know there are no additional worker thread messages. CritScope lock(&cs_); ASSERT(NULL == session_); ASSERT(NULL == channel_); worker_thread_ = NULL; CheckDestroy(); } else if (pmsg->message_id == MSG_ST_EVENT) { ASSERT(stream_thread_->IsCurrent()); //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, " // << data->event << ", " << data->error << ")"; ASSERT(stream_ != NULL); EventData* data = static_cast(pmsg->pdata); if (data->event & SE_READ) { CritScope lock(&cs_); pending_read_event_ = false; } stream_->SignalEvent(stream_, data->event, data->error); delete data; } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) { ASSERT(signal_thread_->IsCurrent()); LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)"; ASSERT(session_ != NULL); ASSERT(channel_ != NULL); session_->DestroyChannel(content_name_, channel_->component()); } else if (pmsg->message_id == MSG_SI_DESTROY) { ASSERT(signal_thread_->IsCurrent()); LOG_F(LS_INFO) << "(MSG_SI_DESTROY)"; // The message queue is empty, so it is safe to destroy ourselves. delete this; } else { ASSERT(false); } } IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket( PseudoTcp* tcp, const char* buffer, size_t len) { ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(tcp == tcp_); ASSERT(NULL != channel_); int sent = channel_->SendPacket(buffer, len, talk_base::DSCP_NO_CHANGE); if (sent > 0) { //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent"; return IPseudoTcpNotify::WR_SUCCESS; } else if (IsBlockingError(channel_->GetError())) { LOG_F(LS_VERBOSE) << "Blocking"; return IPseudoTcpNotify::WR_SUCCESS; } else if (channel_->GetError() == EMSGSIZE) { LOG_F(LS_ERROR) << "EMSGSIZE"; return IPseudoTcpNotify::WR_TOO_LARGE; } else { PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket"; ASSERT(false); return IPseudoTcpNotify::WR_FAIL; } } void PseudoTcpChannel::AdjustClock(bool clear) { ASSERT(cs_.CurrentThreadIsOwner()); ASSERT(NULL != tcp_); long timeout = 0; if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) { ASSERT(NULL != channel_); // Reset the next clock, by clearing the old and setting a new one. if (clear) worker_thread_->Clear(this, MSG_WK_CLOCK); worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK); return; } delete tcp_; tcp_ = NULL; ready_to_connect_ = false; if (channel_) { // If TCP has failed, no need for channel_ anymore signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL); } } void PseudoTcpChannel::CheckDestroy() { ASSERT(cs_.CurrentThreadIsOwner()); if ((worker_thread_ != NULL) || (stream_ != NULL)) return; signal_thread_->Post(this, MSG_SI_DESTROY); } /////////////////////////////////////////////////////////////////////////////// // PseudoTcpChannel::InternalStream /////////////////////////////////////////////////////////////////////////////// PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent) : parent_(parent) { } PseudoTcpChannel::InternalStream::~InternalStream() { Close(); } StreamState PseudoTcpChannel::InternalStream::GetState() const { if (!parent_) return SS_CLOSED; return parent_->GetState(); } StreamResult PseudoTcpChannel::InternalStream::Read( void* buffer, size_t buffer_len, size_t* read, int* error) { if (!parent_) { if (error) *error = ENOTCONN; return SR_ERROR; } return parent_->Read(buffer, buffer_len, read, error); } StreamResult PseudoTcpChannel::InternalStream::Write( const void* data, size_t data_len, size_t* written, int* error) { if (!parent_) { if (error) *error = ENOTCONN; return SR_ERROR; } return parent_->Write(data, data_len, written, error); } void PseudoTcpChannel::InternalStream::Close() { if (!parent_) return; parent_->Close(); parent_ = NULL; } /////////////////////////////////////////////////////////////////////////////// } // namespace cricket