diff --git a/CMakeLists.txt b/CMakeLists.txt index 464fb36e..b7f4deb7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -741,6 +741,7 @@ set(cxx-sources random.cpp raw_encoder.cpp raw_decoder.cpp + raw_engine.cpp reaper.cpp rep.cpp req.cpp @@ -753,7 +754,7 @@ set(cxx-sources socks.cpp socks_connecter.cpp stream.cpp - stream_engine.cpp + stream_engine_base.cpp sub.cpp tcp.cpp tcp_address.cpp @@ -787,6 +788,7 @@ set(cxx-sources ws_encoder.cpp ws_engine.cpp ws_listener.cpp + zmtp_engine.cpp # at least for VS, the header files must also be listed address.hpp array.hpp @@ -873,6 +875,7 @@ set(cxx-sources random.hpp raw_decoder.hpp raw_encoder.hpp + raw_engine.hpp reaper.hpp rep.hpp req.hpp @@ -889,7 +892,7 @@ set(cxx-sources socks_connecter.hpp stdint.hpp stream.hpp - stream_engine.hpp + stream_engine_base.hpp stream_connecter_base.hpp stream_connecter_base.cpp stream_listener_base.hpp @@ -931,6 +934,7 @@ set(cxx-sources ypipe_conflate.hpp yqueue.hpp zap_client.hpp + zmtp_engine.hpp ) if(MINGW) diff --git a/Makefile.am b/Makefile.am index af320445..4c4abc44 100644 --- a/Makefile.am +++ b/Makefile.am @@ -169,6 +169,8 @@ src_libzmq_la_SOURCES = \ src/raw_decoder.hpp \ src/raw_encoder.cpp \ src/raw_encoder.hpp \ + src/raw_engine.cpp \ + src/raw_engine.hpp \ src/reaper.cpp \ src/reaper.hpp \ src/rep.cpp \ @@ -201,8 +203,8 @@ src_libzmq_la_SOURCES = \ src/stream_connecter_base.hpp \ src/stream_listener_base.cpp \ src/stream_listener_base.hpp \ - src/stream_engine.cpp \ - src/stream_engine.hpp \ + src/stream_engine_base.cpp \ + src/stream_engine_base.hpp \ src/sub.cpp \ src/sub.hpp \ src/tcp.cpp \ @@ -275,6 +277,8 @@ src_libzmq_la_SOURCES = \ src/socket_poller.hpp \ src/zap_client.cpp \ src/zap_client.hpp \ + src/zmtp_engine.cpp \ + src/zmtp_engine.hpp \ src/zmq_draft.h \ external/sha1/sha1.c \ external/sha1/sha1.h diff --git a/src/i_engine.hpp b/src/i_engine.hpp index de4f689c..d1feee8d 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -40,6 +40,13 @@ class io_thread_t; struct i_engine { + enum error_reason_t + { + protocol_error, + connection_error, + timeout_error + }; + virtual ~i_engine () {} // Plug the engine to the session. diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index af8429be..89ead5fa 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -36,7 +36,6 @@ #include #include -#include "stream_engine.hpp" #include "io_thread.hpp" #include "random.hpp" #include "err.hpp" diff --git a/src/raw_engine.cpp b/src/raw_engine.cpp new file mode 100644 index 00000000..336805f4 --- /dev/null +++ b/src/raw_engine.cpp @@ -0,0 +1,138 @@ +/* + Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "macros.hpp" + +#include +#include + +#ifndef ZMQ_HAVE_WINDOWS +#include +#endif + +#include +#include + +#include "raw_engine.hpp" +#include "io_thread.hpp" +#include "session_base.hpp" +#include "v1_encoder.hpp" +#include "v1_decoder.hpp" +#include "v2_encoder.hpp" +#include "v2_decoder.hpp" +#include "null_mechanism.hpp" +#include "plain_client.hpp" +#include "plain_server.hpp" +#include "gssapi_client.hpp" +#include "gssapi_server.hpp" +#include "curve_client.hpp" +#include "curve_server.hpp" +#include "raw_decoder.hpp" +#include "raw_encoder.hpp" +#include "config.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "tcp.hpp" +#include "likely.hpp" +#include "wire.hpp" + +zmq::raw_engine_t::raw_engine_t ( + fd_t fd_, + const options_t &options_, + const endpoint_uri_pair_t &endpoint_uri_pair_) : + stream_engine_base_t (fd_, options_, endpoint_uri_pair_) +{ +} + +zmq::raw_engine_t::~raw_engine_t () +{ +} + +void zmq::raw_engine_t::plug_internal () +{ + // no handshaking for raw sock, instantiate raw encoder and decoders + _encoder = new (std::nothrow) raw_encoder_t (_options.out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) raw_decoder_t (_options.in_batch_size); + alloc_assert (_decoder); + + _next_msg = &raw_engine_t::pull_msg_from_session; + _process_msg = static_cast ( + &raw_engine_t::push_raw_msg_to_session); + + properties_t properties; + if (init_properties (properties)) { + // Compile metadata. + zmq_assert (_metadata == NULL); + _metadata = new (std::nothrow) metadata_t (properties); + alloc_assert (_metadata); + } + + if (_options.raw_notify) { + // For raw sockets, send an initial 0-length message to the + // application so that it knows a peer has connected. + msg_t connector; + connector.init (); + push_raw_msg_to_session (&connector); + connector.close (); + session ()->flush (); + } + + set_pollin (); + set_pollout (); + // Flush all the data that may have been already received downstream. + in_event (); +} + +bool zmq::raw_engine_t::handshake () +{ + return true; +} + +void zmq::raw_engine_t::error (error_reason_t reason_) +{ + if (_options.raw_socket && _options.raw_notify) { + // For raw sockets, send a final 0-length message to the application + // so that it knows the peer has been disconnected. + msg_t terminator; + terminator.init (); + (this->*_process_msg) (&terminator); + terminator.close (); + } + stream_engine_base_t::error (reason_); +} + +int zmq::raw_engine_t::push_raw_msg_to_session (msg_t *msg_) +{ + if (_metadata && _metadata != msg_->metadata ()) + msg_->set_metadata (_metadata); + return push_msg_to_session (msg_); +} diff --git a/src/raw_engine.hpp b/src/raw_engine.hpp new file mode 100644 index 00000000..e3845aaf --- /dev/null +++ b/src/raw_engine.hpp @@ -0,0 +1,78 @@ +/* + Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_RAW_ENGINE_HPP_INCLUDED__ +#define __ZMQ_RAW_ENGINE_HPP_INCLUDED__ + +#include + +#include "fd.hpp" +#include "i_engine.hpp" +#include "io_object.hpp" +#include "i_encoder.hpp" +#include "i_decoder.hpp" +#include "options.hpp" +#include "socket_base.hpp" +#include "metadata.hpp" +#include "msg.hpp" +#include "stream_engine_base.hpp" + +namespace zmq +{ +// Protocol revisions + +class io_thread_t; +class session_base_t; +class mechanism_t; + +// This engine handles any socket with SOCK_STREAM semantics, +// e.g. TCP socket or an UNIX domain socket. + +class raw_engine_t : public stream_engine_base_t +{ + public: + raw_engine_t (fd_t fd_, + const options_t &options_, + const endpoint_uri_pair_t &endpoint_uri_pair_); + ~raw_engine_t (); + + protected: + void error (error_reason_t reason_); + void plug_internal (); + bool handshake (); + + private: + int push_raw_msg_to_session (msg_t *msg_); + + raw_engine_t (const raw_engine_t &); + const raw_engine_t &operator= (const raw_engine_t &); +}; +} + +#endif diff --git a/src/session_base.cpp b/src/session_base.cpp index a947599a..712fb105 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -425,8 +425,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_) _engine->plug (_io_thread, this); } -void zmq::session_base_t::engine_error ( - zmq::stream_engine_t::error_reason_t reason_) +void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_) { // Engine is dead. Let's forget about it. _engine = NULL; @@ -435,20 +434,20 @@ void zmq::session_base_t::engine_error ( if (_pipe) clean_pipes (); - zmq_assert (reason_ == stream_engine_t::connection_error - || reason_ == stream_engine_t::timeout_error - || reason_ == stream_engine_t::protocol_error); + zmq_assert (reason_ == i_engine::connection_error + || reason_ == i_engine::timeout_error + || reason_ == i_engine::protocol_error); switch (reason_) { - case stream_engine_t::timeout_error: + case i_engine::timeout_error: /* FALLTHROUGH */ - case stream_engine_t::connection_error: + case i_engine::connection_error: if (_active) { reconnect (); break; } /* FALLTHROUGH */ - case stream_engine_t::protocol_error: + case i_engine::protocol_error: if (_pending) { if (_pipe) _pipe->terminate (false); diff --git a/src/session_base.hpp b/src/session_base.hpp index 6d3abb63..2dd6a458 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -36,7 +36,8 @@ #include "io_object.hpp" #include "pipe.hpp" #include "socket_base.hpp" -#include "stream_engine.hpp" +#include "i_engine.hpp" +#include "msg.hpp" namespace zmq { @@ -61,7 +62,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events virtual void reset (); void flush (); void rollback (); - void engine_error (zmq::stream_engine_t::error_reason_t reason_); + void engine_error (zmq::i_engine::error_reason_t reason_); // i_pipe_events interface implementation. void read_activated (zmq::pipe_t *pipe_); diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp index 80bc0e83..29766c06 100644 --- a/src/socks_connecter.cpp +++ b/src/socks_connecter.cpp @@ -33,7 +33,6 @@ #include "macros.hpp" #include "socks_connecter.hpp" -#include "stream_engine.hpp" #include "random.hpp" #include "err.hpp" #include "ip.hpp" diff --git a/src/stream_connecter_base.cpp b/src/stream_connecter_base.cpp index a9b2aa42..427063da 100644 --- a/src/stream_connecter_base.cpp +++ b/src/stream_connecter_base.cpp @@ -32,6 +32,8 @@ #include "session_base.hpp" #include "address.hpp" #include "random.hpp" +#include "zmtp_engine.hpp" +#include "raw_engine.hpp" #ifndef ZMQ_HAVE_WINDOWS #include @@ -173,8 +175,11 @@ void zmq::stream_connecter_base_t::create_engine ( endpoint_type_connect); // Create the engine object for this connection. - stream_engine_t *engine = - new (std::nothrow) stream_engine_t (fd, options, endpoint_pair); + i_engine *engine; + if (options.raw_socket) + engine = new (std::nothrow) raw_engine_t (fd, options, endpoint_pair); + else + engine = new (std::nothrow) zmtp_engine_t (fd, options, endpoint_pair); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp deleted file mode 100644 index 63a278f1..00000000 --- a/src/stream_engine.cpp +++ /dev/null @@ -1,1268 +0,0 @@ -/* - Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file - - This file is part of libzmq, the ZeroMQ core engine in C++. - - libzmq is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License (LGPL) as published - by the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - As a special exception, the Contributors give you permission to link - this library with independent modules to produce an executable, - regardless of the license terms of these independent modules, and to - copy and distribute the resulting executable under terms of your choice, - provided that you also meet, for each linked independent module, the - terms and conditions of the license of that module. An independent - module is a module which is not derived from or based on this library. - If you modify this library, you must extend this exception to your - version of the library. - - libzmq is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "precompiled.hpp" -#include "macros.hpp" - -#include -#include - -#ifndef ZMQ_HAVE_WINDOWS -#include -#endif - -#include -#include - -#include "stream_engine.hpp" -#include "io_thread.hpp" -#include "session_base.hpp" -#include "v1_encoder.hpp" -#include "v1_decoder.hpp" -#include "v2_encoder.hpp" -#include "v2_decoder.hpp" -#include "null_mechanism.hpp" -#include "plain_client.hpp" -#include "plain_server.hpp" -#include "gssapi_client.hpp" -#include "gssapi_server.hpp" -#include "curve_client.hpp" -#include "curve_server.hpp" -#include "raw_decoder.hpp" -#include "raw_encoder.hpp" -#include "config.hpp" -#include "err.hpp" -#include "ip.hpp" -#include "tcp.hpp" -#include "likely.hpp" -#include "wire.hpp" - -static std::string get_peer_address (zmq::fd_t s_) -{ - std::string peer_address; - - const int family = zmq::get_peer_ip_address (s_, peer_address); - if (family == 0) - peer_address.clear (); -#if defined ZMQ_HAVE_SO_PEERCRED - else if (family == PF_UNIX) { - struct ucred cred; - socklen_t size = sizeof (cred); - if (!getsockopt (s_, SOL_SOCKET, SO_PEERCRED, &cred, &size)) { - std::ostringstream buf; - buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid; - peer_address += buf.str (); - } - } -#elif defined ZMQ_HAVE_LOCAL_PEERCRED - else if (family == PF_UNIX) { - struct xucred cred; - socklen_t size = sizeof (cred); - if (!getsockopt (_s, 0, LOCAL_PEERCRED, &cred, &size) - && cred.cr_version == XUCRED_VERSION) { - std::ostringstream buf; - buf << ":" << cred.cr_uid << ":"; - if (cred.cr_ngroups > 0) - buf << cred.cr_groups[0]; - buf << ":"; - _peer_address += buf.str (); - } - } -#endif - - return peer_address; -} - - -zmq::stream_engine_t::stream_engine_t ( - fd_t fd_, - const options_t &options_, - const endpoint_uri_pair_t &endpoint_uri_pair_) : - _s (fd_), - _handle (static_cast (NULL)), - _inpos (NULL), - _insize (0), - _decoder (NULL), - _outpos (NULL), - _outsize (0), - _encoder (NULL), - _metadata (NULL), - _handshaking (true), - _greeting_size (v2_greeting_size), - _greeting_bytes_read (0), - _session (NULL), - _options (options_), - _endpoint_uri_pair (endpoint_uri_pair_), - _plugged (false), - _next_msg (&stream_engine_t::routing_id_msg), - _process_msg (&stream_engine_t::process_routing_id_msg), - _io_error (false), - _subscription_required (false), - _mechanism (NULL), - _input_stopped (false), - _output_stopped (false), - _has_handshake_timer (false), - _has_ttl_timer (false), - _has_timeout_timer (false), - _has_heartbeat_timer (false), - _heartbeat_timeout (0), - _socket (NULL), - _peer_address (get_peer_address (_s)) -{ - int rc = _tx_msg.init (); - errno_assert (rc == 0); - rc = _pong_msg.init (); - errno_assert (rc == 0); - - // Put the socket into non-blocking mode. - unblock_socket (_s); - - - if (_options.heartbeat_interval > 0) { - _heartbeat_timeout = _options.heartbeat_timeout; - if (_heartbeat_timeout == -1) - _heartbeat_timeout = _options.heartbeat_interval; - } -} - -zmq::stream_engine_t::~stream_engine_t () -{ - zmq_assert (!_plugged); - - if (_s != retired_fd) { -#ifdef ZMQ_HAVE_WINDOWS - int rc = closesocket (_s); - wsa_assert (rc != SOCKET_ERROR); -#else - int rc = close (_s); -#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__) - // FreeBSD may return ECONNRESET on close() under load but this is not - // an error. - if (rc == -1 && errno == ECONNRESET) - rc = 0; -#endif - errno_assert (rc == 0); -#endif - _s = retired_fd; - } - - int rc = _tx_msg.close (); - errno_assert (rc == 0); - - // Drop reference to metadata and destroy it if we are - // the only user. - if (_metadata != NULL) { - if (_metadata->drop_ref ()) { - LIBZMQ_DELETE (_metadata); - } - } - - LIBZMQ_DELETE (_encoder); - LIBZMQ_DELETE (_decoder); - LIBZMQ_DELETE (_mechanism); -} - -void zmq::stream_engine_t::plug (io_thread_t *io_thread_, - session_base_t *session_) -{ - zmq_assert (!_plugged); - _plugged = true; - - // Connect to session object. - zmq_assert (!_session); - zmq_assert (session_); - _session = session_; - _socket = _session->get_socket (); - - // Connect to I/O threads poller object. - io_object_t::plug (io_thread_); - _handle = add_fd (_s); - _io_error = false; - - if (_options.raw_socket) { - // no handshaking for raw sock, instantiate raw encoder and decoders - _encoder = new (std::nothrow) raw_encoder_t (_options.out_batch_size); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) raw_decoder_t (_options.in_batch_size); - alloc_assert (_decoder); - - // disable handshaking for raw socket - _handshaking = false; - - _next_msg = &stream_engine_t::pull_msg_from_session; - _process_msg = &stream_engine_t::push_raw_msg_to_session; - - properties_t properties; - if (init_properties (properties)) { - // Compile metadata. - zmq_assert (_metadata == NULL); - _metadata = new (std::nothrow) metadata_t (properties); - alloc_assert (_metadata); - } - - if (_options.raw_notify) { - // For raw sockets, send an initial 0-length message to the - // application so that it knows a peer has connected. - msg_t connector; - connector.init (); - push_raw_msg_to_session (&connector); - connector.close (); - _session->flush (); - } - } else { - // start optional timer, to prevent handshake hanging on no input - set_handshake_timer (); - - // Send the 'length' and 'flags' fields of the routing id message. - // The 'length' field is encoded in the long format. - _outpos = _greeting_send; - _outpos[_outsize++] = UCHAR_MAX; - put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1); - _outsize += 8; - _outpos[_outsize++] = 0x7f; - } - - set_pollin (_handle); - set_pollout (_handle); - // Flush all the data that may have been already received downstream. - in_event (); -} - -void zmq::stream_engine_t::unplug () -{ - zmq_assert (_plugged); - _plugged = false; - - // Cancel all timers. - if (_has_handshake_timer) { - cancel_timer (handshake_timer_id); - _has_handshake_timer = false; - } - - if (_has_ttl_timer) { - cancel_timer (heartbeat_ttl_timer_id); - _has_ttl_timer = false; - } - - if (_has_timeout_timer) { - cancel_timer (heartbeat_timeout_timer_id); - _has_timeout_timer = false; - } - - if (_has_heartbeat_timer) { - cancel_timer (heartbeat_ivl_timer_id); - _has_heartbeat_timer = false; - } - // Cancel all fd subscriptions. - if (!_io_error) - rm_fd (_handle); - - // Disconnect from I/O threads poller object. - io_object_t::unplug (); - - _session = NULL; -} - -void zmq::stream_engine_t::terminate () -{ - unplug (); - delete this; -} - -void zmq::stream_engine_t::in_event () -{ - // ignore errors - const bool res = in_event_internal (); - LIBZMQ_UNUSED (res); -} - -bool zmq::stream_engine_t::in_event_internal () -{ - zmq_assert (!_io_error); - - // If still handshaking, receive and process the greeting message. - if (unlikely (_handshaking)) - if (!handshake ()) - return false; - - zmq_assert (_decoder); - - // If there has been an I/O error, stop polling. - if (_input_stopped) { - rm_fd (_handle); - _io_error = true; - return true; // TODO or return false in this case too? - } - - // If there's no data to process in the buffer... - if (!_insize) { - // Retrieve the buffer and read as much data as possible. - // Note that buffer can be arbitrarily large. However, we assume - // the underlying TCP layer has fixed buffer size and thus the - // number of bytes read will be always limited. - size_t bufsize = 0; - _decoder->get_buffer (&_inpos, &bufsize); - - const int rc = tcp_read (_s, _inpos, bufsize); - - if (rc == 0) { - // connection closed by peer - errno = EPIPE; - error (connection_error); - return false; - } - if (rc == -1) { - if (errno != EAGAIN) { - error (connection_error); - return false; - } - return true; - } - - // Adjust input size - _insize = static_cast (rc); - // Adjust buffer size to received bytes - _decoder->resize_buffer (_insize); - } - - int rc = 0; - size_t processed = 0; - - while (_insize > 0) { - rc = _decoder->decode (_inpos, _insize, processed); - zmq_assert (processed <= _insize); - _inpos += processed; - _insize -= processed; - if (rc == 0 || rc == -1) - break; - rc = (this->*_process_msg) (_decoder->msg ()); - if (rc == -1) - break; - } - - // Tear down the connection if we have failed to decode input data - // or the session has rejected the message. - if (rc == -1) { - if (errno != EAGAIN) { - error (protocol_error); - return false; - } - _input_stopped = true; - reset_pollin (_handle); - } - - _session->flush (); - return true; -} - -void zmq::stream_engine_t::out_event () -{ - zmq_assert (!_io_error); - - // If write buffer is empty, try to read new data from the encoder. - if (!_outsize) { - // Even when we stop polling as soon as there is no - // data to send, the poller may invoke out_event one - // more time due to 'speculative write' optimisation. - if (unlikely (_encoder == NULL)) { - zmq_assert (_handshaking); - return; - } - - _outpos = NULL; - _outsize = _encoder->encode (&_outpos, 0); - - while (_outsize < static_cast (_options.out_batch_size)) { - if ((this->*_next_msg) (&_tx_msg) == -1) - break; - _encoder->load_msg (&_tx_msg); - unsigned char *bufptr = _outpos + _outsize; - size_t n = - _encoder->encode (&bufptr, _options.out_batch_size - _outsize); - zmq_assert (n > 0); - if (_outpos == NULL) - _outpos = bufptr; - _outsize += n; - } - - // If there is no data to send, stop polling for output. - if (_outsize == 0) { - _output_stopped = true; - reset_pollout (_handle); - return; - } - } - - // If there are any data to write in write buffer, write as much as - // possible to the socket. Note that amount of data to write can be - // arbitrarily large. However, we assume that underlying TCP layer has - // limited transmission buffer and thus the actual number of bytes - // written should be reasonably modest. - const int nbytes = tcp_write (_s, _outpos, _outsize); - - // IO error has occurred. We stop waiting for output events. - // The engine is not terminated until we detect input error; - // this is necessary to prevent losing incoming messages. - if (nbytes == -1) { - reset_pollout (_handle); - return; - } - - _outpos += nbytes; - _outsize -= nbytes; - - // If we are still handshaking and there are no data - // to send, stop polling for output. - if (unlikely (_handshaking)) - if (_outsize == 0) - reset_pollout (_handle); -} - -void zmq::stream_engine_t::restart_output () -{ - if (unlikely (_io_error)) - return; - - if (likely (_output_stopped)) { - set_pollout (_handle); - _output_stopped = false; - } - - // Speculative write: The assumption is that at the moment new message - // was sent by the user the socket is probably available for writing. - // Thus we try to write the data to socket avoiding polling for POLLOUT. - // Consequently, the latency should be better in request/reply scenarios. - out_event (); -} - -bool zmq::stream_engine_t::restart_input () -{ - zmq_assert (_input_stopped); - zmq_assert (_session != NULL); - zmq_assert (_decoder != NULL); - - int rc = (this->*_process_msg) (_decoder->msg ()); - if (rc == -1) { - if (errno == EAGAIN) - _session->flush (); - else { - error (protocol_error); - return false; - } - return true; - } - - while (_insize > 0) { - size_t processed = 0; - rc = _decoder->decode (_inpos, _insize, processed); - zmq_assert (processed <= _insize); - _inpos += processed; - _insize -= processed; - if (rc == 0 || rc == -1) - break; - rc = (this->*_process_msg) (_decoder->msg ()); - if (rc == -1) - break; - } - - if (rc == -1 && errno == EAGAIN) - _session->flush (); - else if (_io_error) { - error (connection_error); - return false; - } else if (rc == -1) { - error (protocol_error); - return false; - } - - else { - _input_stopped = false; - set_pollin (_handle); - _session->flush (); - - // Speculative read. - if (!in_event_internal ()) - return false; - } - - return true; -} - -// Position of the revision field in the greeting. -const size_t revision_pos = 10; - -bool zmq::stream_engine_t::handshake () -{ - zmq_assert (_handshaking); - zmq_assert (_greeting_bytes_read < _greeting_size); - // Receive the greeting. - const int rc = receive_greeting (); - if (rc == -1) - return false; - const bool unversioned = rc != 0; - - if (!(this - ->*select_handshake_fun (unversioned, - _greeting_recv[revision_pos])) ()) - return false; - - // Start polling for output if necessary. - if (_outsize == 0) - set_pollout (_handle); - - // Handshaking was successful. - // Switch into the normal message flow. - _handshaking = false; - - if (_has_handshake_timer) { - cancel_timer (handshake_timer_id); - _has_handshake_timer = false; - } - - return true; -} - -int zmq::stream_engine_t::receive_greeting () -{ - bool unversioned = false; - while (_greeting_bytes_read < _greeting_size) { - const int n = tcp_read (_s, _greeting_recv + _greeting_bytes_read, - _greeting_size - _greeting_bytes_read); - if (n == 0) { - errno = EPIPE; - error (connection_error); - return -1; - } - if (n == -1) { - if (errno != EAGAIN) - error (connection_error); - return -1; - } - - _greeting_bytes_read += n; - - // We have received at least one byte from the peer. - // If the first byte is not 0xff, we know that the - // peer is using unversioned protocol. - if (_greeting_recv[0] != 0xff) { - unversioned = true; - break; - } - - if (_greeting_bytes_read < signature_size) - continue; - - // Inspect the right-most bit of the 10th byte (which coincides - // with the 'flags' field if a regular message was sent). - // Zero indicates this is a header of a routing id message - // (i.e. the peer is using the unversioned protocol). - if (!(_greeting_recv[9] & 0x01)) { - unversioned = true; - break; - } - - // The peer is using versioned protocol. - receive_greeting_versioned (); - } - return unversioned ? 1 : 0; -} - -void zmq::stream_engine_t::receive_greeting_versioned () -{ - // Send the major version number. - if (_outpos + _outsize == _greeting_send + signature_size) { - if (_outsize == 0) - set_pollout (_handle); - _outpos[_outsize++] = 3; // Major version number - } - - if (_greeting_bytes_read > signature_size) { - if (_outpos + _outsize == _greeting_send + signature_size + 1) { - if (_outsize == 0) - set_pollout (_handle); - - // Use ZMTP/2.0 to talk to older peers. - if (_greeting_recv[revision_pos] == ZMTP_1_0 - || _greeting_recv[revision_pos] == ZMTP_2_0) - _outpos[_outsize++] = _options.type; - else { - _outpos[_outsize++] = 0; // Minor version number - memset (_outpos + _outsize, 0, 20); - - zmq_assert (_options.mechanism == ZMQ_NULL - || _options.mechanism == ZMQ_PLAIN - || _options.mechanism == ZMQ_CURVE - || _options.mechanism == ZMQ_GSSAPI); - - if (_options.mechanism == ZMQ_NULL) - memcpy (_outpos + _outsize, "NULL", 4); - else if (_options.mechanism == ZMQ_PLAIN) - memcpy (_outpos + _outsize, "PLAIN", 5); - else if (_options.mechanism == ZMQ_GSSAPI) - memcpy (_outpos + _outsize, "GSSAPI", 6); - else if (_options.mechanism == ZMQ_CURVE) - memcpy (_outpos + _outsize, "CURVE", 5); - _outsize += 20; - memset (_outpos + _outsize, 0, 32); - _outsize += 32; - _greeting_size = v3_greeting_size; - } - } - } -} - -zmq::stream_engine_t::handshake_fun_t -zmq::stream_engine_t::select_handshake_fun (bool unversioned, - unsigned char revision) -{ - // Is the peer using ZMTP/1.0 with no revision number? - if (unversioned) { - return &stream_engine_t::handshake_v1_0_unversioned; - } - switch (revision) { - case ZMTP_1_0: - return &stream_engine_t::handshake_v1_0; - case ZMTP_2_0: - return &stream_engine_t::handshake_v2_0; - default: - return &stream_engine_t::handshake_v3_0; - } -} - -bool zmq::stream_engine_t::handshake_v1_0_unversioned () -{ - // We send and receive rest of routing id message - if (_session->zap_enabled ()) { - // reject ZMTP 1.0 connections if ZAP is enabled - error (protocol_error); - return false; - } - - _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) - v1_decoder_t (_options.in_batch_size, _options.maxmsgsize); - alloc_assert (_decoder); - - // We have already sent the message header. - // Since there is no way to tell the encoder to - // skip the message header, we simply throw that - // header data away. - const size_t header_size = - _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2; - unsigned char tmp[10], *bufferp = tmp; - - // Prepare the routing id message and load it into encoder. - // Then consume bytes we have already sent to the peer. - const int rc = _tx_msg.init_size (_options.routing_id_size); - zmq_assert (rc == 0); - memcpy (_tx_msg.data (), _options.routing_id, _options.routing_id_size); - _encoder->load_msg (&_tx_msg); - const size_t buffer_size = _encoder->encode (&bufferp, header_size); - zmq_assert (buffer_size == header_size); - - // Make sure the decoder sees the data we have already received. - _inpos = _greeting_recv; - _insize = _greeting_bytes_read; - - // To allow for interoperability with peers that do not forward - // their subscriptions, we inject a phantom subscription message - // message into the incoming message stream. - if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB) - _subscription_required = true; - - // We are sending our routing id now and the next message - // will come from the socket. - _next_msg = &stream_engine_t::pull_msg_from_session; - - // We are expecting routing id message. - _process_msg = &stream_engine_t::process_routing_id_msg; - - return true; -} - -bool zmq::stream_engine_t::handshake_v1_0 () -{ - if (_session->zap_enabled ()) { - // reject ZMTP 1.0 connections if ZAP is enabled - error (protocol_error); - return false; - } - - _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) - v1_decoder_t (_options.in_batch_size, _options.maxmsgsize); - alloc_assert (_decoder); - - return true; -} - -bool zmq::stream_engine_t::handshake_v2_0 () -{ - if (_session->zap_enabled ()) { - // reject ZMTP 2.0 connections if ZAP is enabled - error (protocol_error); - return false; - } - - _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) v2_decoder_t ( - _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); - alloc_assert (_decoder); - - return true; -} - -bool zmq::stream_engine_t::handshake_v3_0 () -{ - _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) v2_decoder_t ( - _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); - alloc_assert (_decoder); - - if (_options.mechanism == ZMQ_NULL - && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", - 20) - == 0) { - _mechanism = new (std::nothrow) - null_mechanism_t (_session, _peer_address, _options); - alloc_assert (_mechanism); - } else if (_options.mechanism == ZMQ_PLAIN - && memcmp (_greeting_recv + 12, - "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) - == 0) { - if (_options.as_server) - _mechanism = new (std::nothrow) - plain_server_t (_session, _peer_address, _options); - else - _mechanism = new (std::nothrow) plain_client_t (_session, _options); - alloc_assert (_mechanism); - } -#ifdef ZMQ_HAVE_CURVE - else if (_options.mechanism == ZMQ_CURVE - && memcmp (_greeting_recv + 12, - "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) - == 0) { - if (_options.as_server) - _mechanism = new (std::nothrow) - curve_server_t (_session, _peer_address, _options); - else - _mechanism = new (std::nothrow) curve_client_t (_session, _options); - alloc_assert (_mechanism); - } -#endif -#ifdef HAVE_LIBGSSAPI_KRB5 - else if (_options.mechanism == ZMQ_GSSAPI - && memcmp (_greeting_recv + 12, - "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) - == 0) { - if (_options.as_server) - _mechanism = new (std::nothrow) - gssapi_server_t (_session, _peer_address, _options); - else - _mechanism = - new (std::nothrow) gssapi_client_t (_session, _options); - alloc_assert (_mechanism); - } -#endif - else { - _session->get_socket ()->event_handshake_failed_protocol ( - _session->get_endpoint (), - ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH); - error (protocol_error); - return false; - } - _next_msg = &stream_engine_t::next_handshake_command; - _process_msg = &stream_engine_t::process_handshake_command; - - return true; -} - -int zmq::stream_engine_t::routing_id_msg (msg_t *msg_) -{ - int rc = msg_->init_size (_options.routing_id_size); - errno_assert (rc == 0); - if (_options.routing_id_size > 0) - memcpy (msg_->data (), _options.routing_id, _options.routing_id_size); - _next_msg = &stream_engine_t::pull_msg_from_session; - return 0; -} - -int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_) -{ - if (_options.recv_routing_id) { - msg_->set_flags (msg_t::routing_id); - int rc = _session->push_msg (msg_); - errno_assert (rc == 0); - } else { - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init (); - errno_assert (rc == 0); - } - - if (_subscription_required) { - msg_t subscription; - - // Inject the subscription message, so that also - // ZMQ 2.x peers receive published messages. - int rc = subscription.init_size (1); - errno_assert (rc == 0); - *static_cast (subscription.data ()) = 1; - rc = _session->push_msg (&subscription); - errno_assert (rc == 0); - } - - _process_msg = &stream_engine_t::push_msg_to_session; - - return 0; -} - -int zmq::stream_engine_t::next_handshake_command (msg_t *msg_) -{ - zmq_assert (_mechanism != NULL); - - if (_mechanism->status () == mechanism_t::ready) { - mechanism_ready (); - return pull_and_encode (msg_); - } - if (_mechanism->status () == mechanism_t::error) { - errno = EPROTO; - return -1; - } else { - const int rc = _mechanism->next_handshake_command (msg_); - - if (rc == 0) - msg_->set_flags (msg_t::command); - - return rc; - } -} - -int zmq::stream_engine_t::process_handshake_command (msg_t *msg_) -{ - zmq_assert (_mechanism != NULL); - const int rc = _mechanism->process_handshake_command (msg_); - if (rc == 0) { - if (_mechanism->status () == mechanism_t::ready) - mechanism_ready (); - else if (_mechanism->status () == mechanism_t::error) { - errno = EPROTO; - return -1; - } - if (_output_stopped) - restart_output (); - } - - return rc; -} - -void zmq::stream_engine_t::zap_msg_available () -{ - zmq_assert (_mechanism != NULL); - - const int rc = _mechanism->zap_msg_available (); - if (rc == -1) { - error (protocol_error); - return; - } - if (_input_stopped) - if (!restart_input ()) - return; - if (_output_stopped) - restart_output (); -} - -const zmq::endpoint_uri_pair_t &zmq::stream_engine_t::get_endpoint () const -{ - return _endpoint_uri_pair; -} - -void zmq::stream_engine_t::mechanism_ready () -{ - if (_options.heartbeat_interval > 0) { - add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id); - _has_heartbeat_timer = true; - } - - bool flush_session = false; - - if (_options.recv_routing_id) { - msg_t routing_id; - _mechanism->peer_routing_id (&routing_id); - const int rc = _session->push_msg (&routing_id); - if (rc == -1 && errno == EAGAIN) { - // If the write is failing at this stage with - // an EAGAIN the pipe must be being shut down, - // so we can just bail out of the routing id set. - return; - } - errno_assert (rc == 0); - flush_session = true; - } - - if (_options.router_notify & ZMQ_NOTIFY_CONNECT) { - msg_t connect_notification; - connect_notification.init (); - const int rc = _session->push_msg (&connect_notification); - if (rc == -1 && errno == EAGAIN) { - // If the write is failing at this stage with - // an EAGAIN the pipe must be being shut down, - // so we can just bail out of the notification. - return; - } - errno_assert (rc == 0); - flush_session = true; - } - - if (flush_session) - _session->flush (); - - _next_msg = &stream_engine_t::pull_and_encode; - _process_msg = &stream_engine_t::write_credential; - - // Compile metadata. - properties_t properties; - init_properties (properties); - - // Add ZAP properties. - const properties_t &zap_properties = _mechanism->get_zap_properties (); - properties.insert (zap_properties.begin (), zap_properties.end ()); - - // Add ZMTP properties. - const properties_t &zmtp_properties = _mechanism->get_zmtp_properties (); - properties.insert (zmtp_properties.begin (), zmtp_properties.end ()); - - zmq_assert (_metadata == NULL); - if (!properties.empty ()) { - _metadata = new (std::nothrow) metadata_t (properties); - alloc_assert (_metadata); - } - - _socket->event_handshake_succeeded (_endpoint_uri_pair, 0); -} - -int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_) -{ - return _session->pull_msg (msg_); -} - -int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_) -{ - return _session->push_msg (msg_); -} - -int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) -{ - if (_metadata && _metadata != msg_->metadata ()) - msg_->set_metadata (_metadata); - return push_msg_to_session (msg_); -} - -int zmq::stream_engine_t::write_credential (msg_t *msg_) -{ - zmq_assert (_mechanism != NULL); - zmq_assert (_session != NULL); - - const blob_t &credential = _mechanism->get_user_id (); - if (credential.size () > 0) { - msg_t msg; - int rc = msg.init_size (credential.size ()); - zmq_assert (rc == 0); - memcpy (msg.data (), credential.data (), credential.size ()); - msg.set_flags (msg_t::credential); - rc = _session->push_msg (&msg); - if (rc == -1) { - rc = msg.close (); - errno_assert (rc == 0); - return -1; - } - } - _process_msg = &stream_engine_t::decode_and_push; - return decode_and_push (msg_); -} - -int zmq::stream_engine_t::pull_and_encode (msg_t *msg_) -{ - zmq_assert (_mechanism != NULL); - - if (_session->pull_msg (msg_) == -1) - return -1; - if (_mechanism->encode (msg_) == -1) - return -1; - return 0; -} - -int zmq::stream_engine_t::decode_and_push (msg_t *msg_) -{ - zmq_assert (_mechanism != NULL); - - if (_mechanism->decode (msg_) == -1) - return -1; - - if (_has_timeout_timer) { - _has_timeout_timer = false; - cancel_timer (heartbeat_timeout_timer_id); - } - - if (_has_ttl_timer) { - _has_ttl_timer = false; - cancel_timer (heartbeat_ttl_timer_id); - } - - if (msg_->flags () & msg_t::command) { - process_command_message (msg_); - } - - if (_metadata) - msg_->set_metadata (_metadata); - if (_session->push_msg (msg_) == -1) { - if (errno == EAGAIN) - _process_msg = &stream_engine_t::push_one_then_decode_and_push; - return -1; - } - return 0; -} - -int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_) -{ - const int rc = _session->push_msg (msg_); - if (rc == 0) - _process_msg = &stream_engine_t::decode_and_push; - return rc; -} - -void zmq::stream_engine_t::error (error_reason_t reason_) -{ - if (_options.raw_socket && _options.raw_notify) { - // For raw sockets, send a final 0-length message to the application - // so that it knows the peer has been disconnected. - msg_t terminator; - terminator.init (); - (this->*_process_msg) (&terminator); - terminator.close (); - } - zmq_assert (_session); - - if ((_options.router_notify & ZMQ_NOTIFY_DISCONNECT) && !_handshaking) { - // For router sockets with disconnect notification, rollback - // any incomplete message in the pipe, and push the disconnect - // notification message. - _session->rollback (); - - msg_t disconnect_notification; - disconnect_notification.init (); - _session->push_msg (&disconnect_notification); - } - - // protocol errors have been signaled already at the point where they occurred - if (reason_ != protocol_error - && (_mechanism == NULL - || _mechanism->status () == mechanism_t::handshaking)) { - int err = errno; - _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err); - } - - _socket->event_disconnected (_endpoint_uri_pair, _s); - _session->flush (); - _session->engine_error (reason_); - unplug (); - delete this; -} - -void zmq::stream_engine_t::set_handshake_timer () -{ - zmq_assert (!_has_handshake_timer); - - if (!_options.raw_socket && _options.handshake_ivl > 0) { - add_timer (_options.handshake_ivl, handshake_timer_id); - _has_handshake_timer = true; - } -} - -bool zmq::stream_engine_t::init_properties (properties_t &properties_) -{ - if (_peer_address.empty ()) - return false; - properties_.ZMQ_MAP_INSERT_OR_EMPLACE ( - std::string (ZMQ_MSG_PROPERTY_PEER_ADDRESS), _peer_address); - - // Private property to support deprecated SRCFD - std::ostringstream stream; - stream << static_cast (_s); - std::string fd_string = stream.str (); - properties_.ZMQ_MAP_INSERT_OR_EMPLACE (std::string ("__fd"), - ZMQ_MOVE (fd_string)); - return true; -} - -void zmq::stream_engine_t::timer_event (int id_) -{ - if (id_ == handshake_timer_id) { - _has_handshake_timer = false; - // handshake timer expired before handshake completed, so engine fail - error (timeout_error); - } else if (id_ == heartbeat_ivl_timer_id) { - _next_msg = &stream_engine_t::produce_ping_message; - out_event (); - add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id); - } else if (id_ == heartbeat_ttl_timer_id) { - _has_ttl_timer = false; - error (timeout_error); - } else if (id_ == heartbeat_timeout_timer_id) { - _has_timeout_timer = false; - error (timeout_error); - } else - // There are no other valid timer ids! - assert (false); -} - -int zmq::stream_engine_t::produce_ping_message (msg_t *msg_) -{ - // 16-bit TTL + \4PING == 7 - const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2; - zmq_assert (_mechanism != NULL); - - int rc = msg_->init_size (ping_ttl_len); - errno_assert (rc == 0); - msg_->set_flags (msg_t::command); - // Copy in the command message - memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size); - - uint16_t ttl_val = htons (_options.heartbeat_ttl); - memcpy (static_cast (msg_->data ()) + msg_t::ping_cmd_name_size, - &ttl_val, sizeof (ttl_val)); - - rc = _mechanism->encode (msg_); - _next_msg = &stream_engine_t::pull_and_encode; - if (!_has_timeout_timer && _heartbeat_timeout > 0) { - add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id); - _has_timeout_timer = true; - } - return rc; -} - -int zmq::stream_engine_t::produce_pong_message (msg_t *msg_) -{ - zmq_assert (_mechanism != NULL); - - int rc = msg_->move (_pong_msg); - errno_assert (rc == 0); - - rc = _mechanism->encode (msg_); - _next_msg = &stream_engine_t::pull_and_encode; - return rc; -} - -int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_) -{ - if (msg_->is_ping ()) { - // 16-bit TTL + \4PING == 7 - const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2; - const size_t ping_max_ctx_len = 16; - uint16_t remote_heartbeat_ttl; - - // Get the remote heartbeat TTL to setup the timer - memcpy (&remote_heartbeat_ttl, - static_cast (msg_->data ()) - + msg_t::ping_cmd_name_size, - ping_ttl_len - msg_t::ping_cmd_name_size); - remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl); - // The remote heartbeat is in 10ths of a second - // so we multiply it by 100 to get the timer interval in ms. - remote_heartbeat_ttl *= 100; - - if (!_has_ttl_timer && remote_heartbeat_ttl > 0) { - add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id); - _has_ttl_timer = true; - } - - // As per ZMTP 3.1 the PING command might contain an up to 16 bytes - // context which needs to be PONGed back, so build the pong message - // here and store it. Truncate it if it's too long. - // Given the engine goes straight to out_event, sequential PINGs will - // not be a problem. - const size_t context_len = - std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len); - const int rc = - _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len); - errno_assert (rc == 0); - _pong_msg.set_flags (msg_t::command); - memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size); - if (context_len > 0) - memcpy (static_cast (_pong_msg.data ()) - + msg_t::ping_cmd_name_size, - static_cast (msg_->data ()) + ping_ttl_len, - context_len); - - _next_msg = &stream_engine_t::produce_pong_message; - out_event (); - } - - return 0; -} - -int zmq::stream_engine_t::process_command_message (msg_t *msg_) -{ - const uint8_t cmd_name_size = - *(static_cast (msg_->data ())); - const size_t ping_name_size = msg_t::ping_cmd_name_size - 1; - const size_t sub_name_size = msg_t::sub_cmd_name_size - 1; - const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1; - // Malformed command - if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size))) - return -1; - - uint8_t *cmd_name = (static_cast (msg_->data ())) + 1; - if (cmd_name_size == ping_name_size - && memcmp (cmd_name, "PING", cmd_name_size) == 0) - msg_->set_flags (zmq::msg_t::ping); - if (cmd_name_size == ping_name_size - && memcmp (cmd_name, "PONG", cmd_name_size) == 0) - msg_->set_flags (zmq::msg_t::pong); - if (cmd_name_size == sub_name_size - && memcmp (cmd_name, "SUBSCRIBE", cmd_name_size) == 0) - msg_->set_flags (zmq::msg_t::subscribe); - if (cmd_name_size == cancel_name_size - && memcmp (cmd_name, "CANCEL", cmd_name_size) == 0) - msg_->set_flags (zmq::msg_t::cancel); - - if (msg_->is_ping () || msg_->is_pong ()) - return process_heartbeat_message (msg_); - - return 0; -} diff --git a/src/stream_engine_base.cpp b/src/stream_engine_base.cpp new file mode 100644 index 00000000..c63072a1 --- /dev/null +++ b/src/stream_engine_base.cpp @@ -0,0 +1,750 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "macros.hpp" + +#include +#include + +#ifndef ZMQ_HAVE_WINDOWS +#include +#endif + +#include +#include + +#include "stream_engine_base.hpp" +#include "io_thread.hpp" +#include "session_base.hpp" +#include "v1_encoder.hpp" +#include "v1_decoder.hpp" +#include "v2_encoder.hpp" +#include "v2_decoder.hpp" +#include "null_mechanism.hpp" +#include "plain_client.hpp" +#include "plain_server.hpp" +#include "gssapi_client.hpp" +#include "gssapi_server.hpp" +#include "curve_client.hpp" +#include "curve_server.hpp" +#include "raw_decoder.hpp" +#include "raw_encoder.hpp" +#include "config.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "tcp.hpp" +#include "likely.hpp" +#include "wire.hpp" + +static std::string get_peer_address (zmq::fd_t s_) +{ + std::string peer_address; + + const int family = zmq::get_peer_ip_address (s_, peer_address); + if (family == 0) + peer_address.clear (); +#if defined ZMQ_HAVE_SO_PEERCRED + else if (family == PF_UNIX) { + struct ucred cred; + socklen_t size = sizeof (cred); + if (!getsockopt (s_, SOL_SOCKET, SO_PEERCRED, &cred, &size)) { + std::ostringstream buf; + buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid; + peer_address += buf.str (); + } + } +#elif defined ZMQ_HAVE_LOCAL_PEERCRED + else if (family == PF_UNIX) { + struct xucred cred; + socklen_t size = sizeof (cred); + if (!getsockopt (_s, 0, LOCAL_PEERCRED, &cred, &size) + && cred.cr_version == XUCRED_VERSION) { + std::ostringstream buf; + buf << ":" << cred.cr_uid << ":"; + if (cred.cr_ngroups > 0) + buf << cred.cr_groups[0]; + buf << ":"; + _peer_address += buf.str (); + } + } +#endif + + return peer_address; +} + +zmq::stream_engine_base_t::stream_engine_base_t ( + fd_t fd_, + const options_t &options_, + const endpoint_uri_pair_t &endpoint_uri_pair_) : + _options (options_), + _inpos (NULL), + _insize (0), + _decoder (NULL), + _outpos (NULL), + _outsize (0), + _encoder (NULL), + _mechanism (NULL), + _next_msg (NULL), + _process_msg (NULL), + _metadata (NULL), + _input_stopped (false), + _output_stopped (false), + _endpoint_uri_pair (endpoint_uri_pair_), + _has_handshake_timer (false), + _has_ttl_timer (false), + _has_timeout_timer (false), + _has_heartbeat_timer (false), + _peer_address (get_peer_address (fd_)), + _s (fd_), + _handle (static_cast (NULL)), + _plugged (false), + _handshaking (true), + _io_error (false), + _session (NULL), + _socket (NULL) +{ + int rc = _tx_msg.init (); + errno_assert (rc == 0); + + // Put the socket into non-blocking mode. + unblock_socket (_s); +} + +zmq::stream_engine_base_t::~stream_engine_base_t () +{ + zmq_assert (!_plugged); + + if (_s != retired_fd) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = closesocket (_s); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = close (_s); +#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__) + // FreeBSD may return ECONNRESET on close() under load but this is not + // an error. + if (rc == -1 && errno == ECONNRESET) + rc = 0; +#endif + errno_assert (rc == 0); +#endif + _s = retired_fd; + } + + int rc = _tx_msg.close (); + errno_assert (rc == 0); + + // Drop reference to metadata and destroy it if we are + // the only user. + if (_metadata != NULL) { + if (_metadata->drop_ref ()) { + LIBZMQ_DELETE (_metadata); + } + } + + LIBZMQ_DELETE (_encoder); + LIBZMQ_DELETE (_decoder); + LIBZMQ_DELETE (_mechanism); +} + +void zmq::stream_engine_base_t::plug (io_thread_t *io_thread_, + session_base_t *session_) +{ + zmq_assert (!_plugged); + _plugged = true; + + // Connect to session object. + zmq_assert (!_session); + zmq_assert (session_); + _session = session_; + _socket = _session->get_socket (); + + // Connect to I/O threads poller object. + io_object_t::plug (io_thread_); + _handle = add_fd (_s); + _io_error = false; + + plug_internal (); +} + +void zmq::stream_engine_base_t::unplug () +{ + zmq_assert (_plugged); + _plugged = false; + + // Cancel all timers. + if (_has_handshake_timer) { + cancel_timer (handshake_timer_id); + _has_handshake_timer = false; + } + + if (_has_ttl_timer) { + cancel_timer (heartbeat_ttl_timer_id); + _has_ttl_timer = false; + } + + if (_has_timeout_timer) { + cancel_timer (heartbeat_timeout_timer_id); + _has_timeout_timer = false; + } + + if (_has_heartbeat_timer) { + cancel_timer (heartbeat_ivl_timer_id); + _has_heartbeat_timer = false; + } + // Cancel all fd subscriptions. + if (!_io_error) + rm_fd (_handle); + + // Disconnect from I/O threads poller object. + io_object_t::unplug (); + + _session = NULL; +} + +void zmq::stream_engine_base_t::terminate () +{ + unplug (); + delete this; +} + +void zmq::stream_engine_base_t::in_event () +{ + // ignore errors + const bool res = in_event_internal (); + LIBZMQ_UNUSED (res); +} + +bool zmq::stream_engine_base_t::in_event_internal () +{ + zmq_assert (!_io_error); + + // If still handshaking, receive and process the greeting message. + if (unlikely (_handshaking)) { + if (handshake ()) { + // Handshaking was successful. + // Switch into the normal message flow. + _handshaking = false; + } else + return false; + } + + + zmq_assert (_decoder); + + // If there has been an I/O error, stop polling. + if (_input_stopped) { + rm_fd (_handle); + _io_error = true; + return true; // TODO or return false in this case too? + } + + // If there's no data to process in the buffer... + if (!_insize) { + // Retrieve the buffer and read as much data as possible. + // Note that buffer can be arbitrarily large. However, we assume + // the underlying TCP layer has fixed buffer size and thus the + // number of bytes read will be always limited. + size_t bufsize = 0; + _decoder->get_buffer (&_inpos, &bufsize); + + const int rc = tcp_read (_inpos, bufsize); + + if (rc == 0) { + // connection closed by peer + errno = EPIPE; + error (connection_error); + return false; + } + if (rc == -1) { + if (errno != EAGAIN) { + error (connection_error); + return false; + } + return true; + } + + // Adjust input size + _insize = static_cast (rc); + // Adjust buffer size to received bytes + _decoder->resize_buffer (_insize); + } + + int rc = 0; + size_t processed = 0; + + while (_insize > 0) { + rc = _decoder->decode (_inpos, _insize, processed); + zmq_assert (processed <= _insize); + _inpos += processed; + _insize -= processed; + if (rc == 0 || rc == -1) + break; + rc = (this->*_process_msg) (_decoder->msg ()); + if (rc == -1) + break; + } + + // Tear down the connection if we have failed to decode input data + // or the session has rejected the message. + if (rc == -1) { + if (errno != EAGAIN) { + error (protocol_error); + return false; + } + _input_stopped = true; + reset_pollin (_handle); + } + + _session->flush (); + return true; +} + +void zmq::stream_engine_base_t::out_event () +{ + zmq_assert (!_io_error); + + // If write buffer is empty, try to read new data from the encoder. + if (!_outsize) { + // Even when we stop polling as soon as there is no + // data to send, the poller may invoke out_event one + // more time due to 'speculative write' optimisation. + if (unlikely (_encoder == NULL)) { + zmq_assert (_handshaking); + return; + } + + _outpos = NULL; + _outsize = _encoder->encode (&_outpos, 0); + + while (_outsize < static_cast (_options.out_batch_size)) { + if ((this->*_next_msg) (&_tx_msg) == -1) + break; + _encoder->load_msg (&_tx_msg); + unsigned char *bufptr = _outpos + _outsize; + size_t n = + _encoder->encode (&bufptr, _options.out_batch_size - _outsize); + zmq_assert (n > 0); + if (_outpos == NULL) + _outpos = bufptr; + _outsize += n; + } + + // If there is no data to send, stop polling for output. + if (_outsize == 0) { + _output_stopped = true; + reset_pollout (_handle); + return; + } + } + + // If there are any data to write in write buffer, write as much as + // possible to the socket. Note that amount of data to write can be + // arbitrarily large. However, we assume that underlying TCP layer has + // limited transmission buffer and thus the actual number of bytes + // written should be reasonably modest. + const int nbytes = tcp_write (_s, _outpos, _outsize); + + // IO error has occurred. We stop waiting for output events. + // The engine is not terminated until we detect input error; + // this is necessary to prevent losing incoming messages. + if (nbytes == -1) { + reset_pollout (_handle); + return; + } + + _outpos += nbytes; + _outsize -= nbytes; + + // If we are still handshaking and there are no data + // to send, stop polling for output. + if (unlikely (_handshaking)) + if (_outsize == 0) + reset_pollout (_handle); +} + +void zmq::stream_engine_base_t::restart_output () +{ + if (unlikely (_io_error)) + return; + + if (likely (_output_stopped)) { + set_pollout (); + _output_stopped = false; + } + + // Speculative write: The assumption is that at the moment new message + // was sent by the user the socket is probably available for writing. + // Thus we try to write the data to socket avoiding polling for POLLOUT. + // Consequently, the latency should be better in request/reply scenarios. + out_event (); +} + +bool zmq::stream_engine_base_t::restart_input () +{ + zmq_assert (_input_stopped); + zmq_assert (_session != NULL); + zmq_assert (_decoder != NULL); + + int rc = (this->*_process_msg) (_decoder->msg ()); + if (rc == -1) { + if (errno == EAGAIN) + _session->flush (); + else { + error (protocol_error); + return false; + } + return true; + } + + while (_insize > 0) { + size_t processed = 0; + rc = _decoder->decode (_inpos, _insize, processed); + zmq_assert (processed <= _insize); + _inpos += processed; + _insize -= processed; + if (rc == 0 || rc == -1) + break; + rc = (this->*_process_msg) (_decoder->msg ()); + if (rc == -1) + break; + } + + if (rc == -1 && errno == EAGAIN) + _session->flush (); + else if (_io_error) { + error (connection_error); + return false; + } else if (rc == -1) { + error (protocol_error); + return false; + } + + else { + _input_stopped = false; + set_pollin (); + _session->flush (); + + // Speculative read. + if (!in_event_internal ()) + return false; + } + + return true; +} + +int zmq::stream_engine_base_t::next_handshake_command (msg_t *msg_) +{ + zmq_assert (_mechanism != NULL); + + if (_mechanism->status () == mechanism_t::ready) { + mechanism_ready (); + return pull_and_encode (msg_); + } + if (_mechanism->status () == mechanism_t::error) { + errno = EPROTO; + return -1; + } else { + const int rc = _mechanism->next_handshake_command (msg_); + + if (rc == 0) + msg_->set_flags (msg_t::command); + + return rc; + } +} + +int zmq::stream_engine_base_t::process_handshake_command (msg_t *msg_) +{ + zmq_assert (_mechanism != NULL); + const int rc = _mechanism->process_handshake_command (msg_); + if (rc == 0) { + if (_mechanism->status () == mechanism_t::ready) + mechanism_ready (); + else if (_mechanism->status () == mechanism_t::error) { + errno = EPROTO; + return -1; + } + if (_output_stopped) + restart_output (); + } + + return rc; +} + +void zmq::stream_engine_base_t::zap_msg_available () +{ + zmq_assert (_mechanism != NULL); + + const int rc = _mechanism->zap_msg_available (); + if (rc == -1) { + error (protocol_error); + return; + } + if (_input_stopped) + if (!restart_input ()) + return; + if (_output_stopped) + restart_output (); +} + +const zmq::endpoint_uri_pair_t &zmq::stream_engine_base_t::get_endpoint () const +{ + return _endpoint_uri_pair; +} + +void zmq::stream_engine_base_t::mechanism_ready () +{ + if (_options.heartbeat_interval > 0) { + add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id); + _has_heartbeat_timer = true; + } + + bool flush_session = false; + + if (_options.recv_routing_id) { + msg_t routing_id; + _mechanism->peer_routing_id (&routing_id); + const int rc = _session->push_msg (&routing_id); + if (rc == -1 && errno == EAGAIN) { + // If the write is failing at this stage with + // an EAGAIN the pipe must be being shut down, + // so we can just bail out of the routing id set. + return; + } + errno_assert (rc == 0); + flush_session = true; + } + + if (_options.router_notify & ZMQ_NOTIFY_CONNECT) { + msg_t connect_notification; + connect_notification.init (); + const int rc = _session->push_msg (&connect_notification); + if (rc == -1 && errno == EAGAIN) { + // If the write is failing at this stage with + // an EAGAIN the pipe must be being shut down, + // so we can just bail out of the notification. + return; + } + errno_assert (rc == 0); + flush_session = true; + } + + if (flush_session) + _session->flush (); + + _next_msg = &stream_engine_base_t::pull_and_encode; + _process_msg = &stream_engine_base_t::write_credential; + + // Compile metadata. + properties_t properties; + init_properties (properties); + + // Add ZAP properties. + const properties_t &zap_properties = _mechanism->get_zap_properties (); + properties.insert (zap_properties.begin (), zap_properties.end ()); + + // Add ZMTP properties. + const properties_t &zmtp_properties = _mechanism->get_zmtp_properties (); + properties.insert (zmtp_properties.begin (), zmtp_properties.end ()); + + zmq_assert (_metadata == NULL); + if (!properties.empty ()) { + _metadata = new (std::nothrow) metadata_t (properties); + alloc_assert (_metadata); + } + + _socket->event_handshake_succeeded (_endpoint_uri_pair, 0); +} + +int zmq::stream_engine_base_t::write_credential (msg_t *msg_) +{ + zmq_assert (_mechanism != NULL); + zmq_assert (_session != NULL); + + const blob_t &credential = _mechanism->get_user_id (); + if (credential.size () > 0) { + msg_t msg; + int rc = msg.init_size (credential.size ()); + zmq_assert (rc == 0); + memcpy (msg.data (), credential.data (), credential.size ()); + msg.set_flags (msg_t::credential); + rc = _session->push_msg (&msg); + if (rc == -1) { + rc = msg.close (); + errno_assert (rc == 0); + return -1; + } + } + _process_msg = &stream_engine_base_t::decode_and_push; + return decode_and_push (msg_); +} + +int zmq::stream_engine_base_t::pull_and_encode (msg_t *msg_) +{ + zmq_assert (_mechanism != NULL); + + if (_session->pull_msg (msg_) == -1) + return -1; + if (_mechanism->encode (msg_) == -1) + return -1; + return 0; +} + +int zmq::stream_engine_base_t::decode_and_push (msg_t *msg_) +{ + zmq_assert (_mechanism != NULL); + + if (_mechanism->decode (msg_) == -1) + return -1; + + if (_has_timeout_timer) { + _has_timeout_timer = false; + cancel_timer (heartbeat_timeout_timer_id); + } + + if (_has_ttl_timer) { + _has_ttl_timer = false; + cancel_timer (heartbeat_ttl_timer_id); + } + + if (msg_->flags () & msg_t::command) { + process_command_message (msg_); + } + + if (_metadata) + msg_->set_metadata (_metadata); + if (_session->push_msg (msg_) == -1) { + if (errno == EAGAIN) + _process_msg = &stream_engine_base_t::push_one_then_decode_and_push; + return -1; + } + return 0; +} + +int zmq::stream_engine_base_t::push_one_then_decode_and_push (msg_t *msg_) +{ + const int rc = _session->push_msg (msg_); + if (rc == 0) + _process_msg = &stream_engine_base_t::decode_and_push; + return rc; +} + +int zmq::stream_engine_base_t::pull_msg_from_session (msg_t *msg_) +{ + return _session->pull_msg (msg_); +} + +int zmq::stream_engine_base_t::push_msg_to_session (msg_t *msg_) +{ + return _session->push_msg (msg_); +} + +void zmq::stream_engine_base_t::error (error_reason_t reason_) +{ + zmq_assert (_session); + + if ((_options.router_notify & ZMQ_NOTIFY_DISCONNECT) && !_handshaking) { + // For router sockets with disconnect notification, rollback + // any incomplete message in the pipe, and push the disconnect + // notification message. + _session->rollback (); + + msg_t disconnect_notification; + disconnect_notification.init (); + _session->push_msg (&disconnect_notification); + } + + // protocol errors have been signaled already at the point where they occurred + if (reason_ != protocol_error + && (_mechanism == NULL + || _mechanism->status () == mechanism_t::handshaking)) { + int err = errno; + _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err); + } + + _socket->event_disconnected (_endpoint_uri_pair, _s); + _session->flush (); + _session->engine_error (reason_); + unplug (); + delete this; +} + +void zmq::stream_engine_base_t::set_handshake_timer () +{ + zmq_assert (!_has_handshake_timer); + + if (_options.handshake_ivl > 0) { + add_timer (_options.handshake_ivl, handshake_timer_id); + _has_handshake_timer = true; + } +} + +bool zmq::stream_engine_base_t::init_properties (properties_t &properties_) +{ + if (_peer_address.empty ()) + return false; + properties_.ZMQ_MAP_INSERT_OR_EMPLACE ( + std::string (ZMQ_MSG_PROPERTY_PEER_ADDRESS), _peer_address); + + // Private property to support deprecated SRCFD + std::ostringstream stream; + stream << static_cast (_s); + std::string fd_string = stream.str (); + properties_.ZMQ_MAP_INSERT_OR_EMPLACE (std::string ("__fd"), + ZMQ_MOVE (fd_string)); + return true; +} + +void zmq::stream_engine_base_t::timer_event (int id_) +{ + if (id_ == handshake_timer_id) { + _has_handshake_timer = false; + // handshake timer expired before handshake completed, so engine fail + error (timeout_error); + } else if (id_ == heartbeat_ivl_timer_id) { + _next_msg = &stream_engine_base_t::produce_ping_message; + out_event (); + add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id); + } else if (id_ == heartbeat_ttl_timer_id) { + _has_ttl_timer = false; + error (timeout_error); + } else if (id_ == heartbeat_timeout_timer_id) { + _has_timeout_timer = false; + error (timeout_error); + } else + // There are no other valid timer ids! + assert (false); +} + +int zmq::stream_engine_base_t::tcp_read (void *data_, size_t size_) +{ + return zmq::tcp_read (_s, data_, size_); +} diff --git a/src/stream_engine.hpp b/src/stream_engine_base.hpp similarity index 63% rename from src/stream_engine.hpp rename to src/stream_engine_base.hpp index a1e9ce2d..2c6e6c3d 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine_base.hpp @@ -27,8 +27,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_STREAM_ENGINE_HPP_INCLUDED__ -#define __ZMQ_STREAM_ENGINE_HPP_INCLUDED__ +#ifndef __ZMQ_STREAM_ENGINE_BASE_HPP_INCLUDED__ +#define __ZMQ_STREAM_ENGINE_BASE_HPP_INCLUDED__ #include @@ -41,16 +41,10 @@ #include "socket_base.hpp" #include "metadata.hpp" #include "msg.hpp" +#include "tcp.hpp" namespace zmq { -// Protocol revisions -enum -{ - ZMTP_1_0 = 0, - ZMTP_2_0 = 1 -}; - class io_thread_t; class session_base_t; class mechanism_t; @@ -58,20 +52,13 @@ class mechanism_t; // This engine handles any socket with SOCK_STREAM semantics, // e.g. TCP socket or an UNIX domain socket. -class stream_engine_t : public io_object_t, public i_engine +class stream_engine_base_t : public io_object_t, public i_engine { public: - enum error_reason_t - { - protocol_error, - connection_error, - timeout_error - }; - - stream_engine_t (fd_t fd_, - const options_t &options_, - const endpoint_uri_pair_t &endpoint_uri_pair_); - ~stream_engine_t (); + stream_engine_base_t (fd_t fd_, + const options_t &options_, + const endpoint_uri_pair_t &endpoint_uri_pair_); + ~stream_engine_base_t (); // i_engine interface implementation. void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); @@ -86,33 +73,12 @@ class stream_engine_t : public io_object_t, public i_engine void out_event (); void timer_event (int id_); - private: - bool in_event_internal (); - - // Unplug the engine from the session. - void unplug (); + protected: + typedef metadata_t::dict_t properties_t; + bool init_properties (properties_t &properties_); // Function to handle network disconnections. - void error (error_reason_t reason_); - - // Detects the protocol used by the peer. - bool handshake (); - - // Receive the greeting from the peer. - int receive_greeting (); - void receive_greeting_versioned (); - - typedef bool (stream_engine_t::*handshake_fun_t) (); - static handshake_fun_t select_handshake_fun (bool unversioned, - unsigned char revision); - - bool handshake_v1_0_unversioned (); - bool handshake_v1_0 (); - bool handshake_v2_0 (); - bool handshake_v3_0 (); - - int routing_id_msg (msg_t *msg_); - int process_routing_id_msg (msg_t *msg_); + virtual void error (error_reason_t reason_); int next_handshake_command (msg_t *msg_); int process_handshake_command (msg_t *msg_); @@ -120,38 +86,26 @@ class stream_engine_t : public io_object_t, public i_engine int pull_msg_from_session (msg_t *msg_); int push_msg_to_session (msg_t *msg_); - int push_raw_msg_to_session (msg_t *msg_); - - int write_credential (msg_t *msg_); int pull_and_encode (msg_t *msg_); int decode_and_push (msg_t *msg_); - int push_one_then_decode_and_push (msg_t *msg_); - - void mechanism_ready (); - - size_t add_property (unsigned char *ptr_, - const char *name_, - const void *value_, - size_t value_len_); void set_handshake_timer (); + int tcp_read (void *data_, size_t size_); - typedef metadata_t::dict_t properties_t; - bool init_properties (properties_t &properties_); + virtual bool handshake () { return true; }; + virtual void plug_internal (){}; - int process_command_message (msg_t *msg_); - int produce_ping_message (msg_t *msg_); - int process_heartbeat_message (msg_t *msg_); - int produce_pong_message (msg_t *msg_); + virtual int process_command_message (msg_t *msg_) { return -1; }; + virtual int produce_ping_message (msg_t *msg_) { return -1; }; + virtual int process_heartbeat_message (msg_t *msg_) { return -1; }; + virtual int produce_pong_message (msg_t *msg_) { return -1; }; - // Underlying socket. - fd_t _s; + void set_pollout () { io_object_t::set_pollout (_handle); } + void set_pollin () { io_object_t::set_pollin (_handle); } + session_base_t *session () { return _session; } + socket_base_t *socket () { return _socket; } - msg_t _tx_msg; - // Need to store PING payload for PONG - msg_t _pong_msg; - - handle_t _handle; + const options_t _options; unsigned char *_inpos; size_t _insize; @@ -161,61 +115,23 @@ class stream_engine_t : public io_object_t, public i_engine size_t _outsize; i_encoder *_encoder; + mechanism_t *_mechanism; + + int (stream_engine_base_t::*_next_msg) (msg_t *msg_); + int (stream_engine_base_t::*_process_msg) (msg_t *msg_); + // Metadata to be attached to received messages. May be NULL. metadata_t *_metadata; - // When true, we are still trying to determine whether - // the peer is using versioned protocol, and if so, which - // version. When false, normal message flow has started. - bool _handshaking; - - static const size_t signature_size = 10; - - // Size of ZMTP/1.0 and ZMTP/2.0 greeting message - static const size_t v2_greeting_size = 12; - - // Size of ZMTP/3.0 greeting message - static const size_t v3_greeting_size = 64; - - // Expected greeting size. - size_t _greeting_size; - - // Greeting received from, and sent to peer - unsigned char _greeting_recv[v3_greeting_size]; - unsigned char _greeting_send[v3_greeting_size]; - - // Size of greeting received so far - unsigned int _greeting_bytes_read; - - // The session this engine is attached to. - zmq::session_base_t *_session; - - const options_t _options; - - // Representation of the connected endpoints. - const endpoint_uri_pair_t _endpoint_uri_pair; - - bool _plugged; - - int (stream_engine_t::*_next_msg) (msg_t *msg_); - - int (stream_engine_t::*_process_msg) (msg_t *msg_); - - bool _io_error; - - // Indicates whether the engine is to inject a phantom - // subscription message into the incoming stream. - // Needed to support old peers. - bool _subscription_required; - - mechanism_t *_mechanism; - // True iff the engine couldn't consume the last decoded message. bool _input_stopped; // True iff the engine doesn't have any message to encode. bool _output_stopped; + // Representation of the connected endpoints. + const endpoint_uri_pair_t _endpoint_uri_pair; + // ID of the handshake timer enum { @@ -235,15 +151,45 @@ class stream_engine_t : public io_object_t, public i_engine bool _has_ttl_timer; bool _has_timeout_timer; bool _has_heartbeat_timer; - int _heartbeat_timeout; + + + const std::string _peer_address; + + private: + bool in_event_internal (); + + // Unplug the engine from the session. + void unplug (); + + int write_credential (msg_t *msg_); + int push_one_then_decode_and_push (msg_t *msg_); + + void mechanism_ready (); + + // Underlying socket. + fd_t _s; + + handle_t _handle; + + bool _plugged; + + // When true, we are still trying to determine whether + // the peer is using versioned protocol, and if so, which + // version. When false, normal message flow has started. + bool _handshaking; + + msg_t _tx_msg; + + bool _io_error; + + // The session this engine is attached to. + zmq::session_base_t *_session; // Socket zmq::socket_base_t *_socket; - const std::string _peer_address; - - stream_engine_t (const stream_engine_t &); - const stream_engine_t &operator= (const stream_engine_t &); + stream_engine_base_t (const stream_engine_base_t &); + const stream_engine_base_t &operator= (const stream_engine_base_t &); }; } diff --git a/src/stream_listener_base.cpp b/src/stream_listener_base.cpp index 86b6cff9..d2d7b837 100644 --- a/src/stream_listener_base.cpp +++ b/src/stream_listener_base.cpp @@ -31,7 +31,8 @@ #include "stream_listener_base.hpp" #include "session_base.hpp" #include "socket_base.hpp" -#include "stream_engine.hpp" +#include "zmtp_engine.hpp" +#include "raw_engine.hpp" #ifndef ZMQ_HAVE_WINDOWS #include @@ -102,8 +103,11 @@ void zmq::stream_listener_base_t::create_engine (fd_t fd) get_socket_name (fd, socket_end_local), get_socket_name (fd, socket_end_remote), endpoint_type_bind); - stream_engine_t *engine = - new (std::nothrow) stream_engine_t (fd, options, endpoint_pair); + i_engine *engine; + if (options.raw_socket) + engine = new (std::nothrow) raw_engine_t (fd, options, endpoint_pair); + else + engine = new (std::nothrow) zmtp_engine_t (fd, options, endpoint_pair); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 67cca590..4315c9ae 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -33,7 +33,6 @@ #include "macros.hpp" #include "tcp_connecter.hpp" -#include "stream_engine.hpp" #include "io_thread.hpp" #include "err.hpp" #include "ip.hpp" diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp index 6725ea3e..351c25e7 100644 --- a/src/tipc_connecter.cpp +++ b/src/tipc_connecter.cpp @@ -36,7 +36,6 @@ #include #include -#include "stream_engine.hpp" #include "io_thread.hpp" #include "platform.hpp" #include "random.hpp" diff --git a/src/ws_connecter.cpp b/src/ws_connecter.cpp index deefaefa..57a1f639 100644 --- a/src/ws_connecter.cpp +++ b/src/ws_connecter.cpp @@ -33,7 +33,6 @@ #include "macros.hpp" #include "ws_connecter.hpp" -#include "stream_engine.hpp" #include "io_thread.hpp" #include "err.hpp" #include "ip.hpp" diff --git a/src/ws_engine.cpp b/src/ws_engine.cpp index 5448472c..3db9b751 100644 --- a/src/ws_engine.cpp +++ b/src/ws_engine.cpp @@ -70,79 +70,31 @@ zmq::ws_engine_t::ws_engine_t (fd_t fd_, const options_t &options_, const endpoint_uri_pair_t &endpoint_uri_pair_, bool client_) : + stream_engine_base_t (fd_, options_, endpoint_uri_pair_), _client (client_), - _plugged (false), - _socket (NULL), - _fd (fd_), - _session (NULL), - _handle (static_cast (NULL)), - _options (options_), - _endpoint_uri_pair (endpoint_uri_pair_), - _handshaking (true), _client_handshake_state (client_handshake_initial), _server_handshake_state (handshake_initial), _header_name_position (0), _header_value_position (0), _header_upgrade_websocket (false), _header_connection_upgrade (false), - _websocket_protocol (false), - _input_stopped (false), - _decoder (NULL), - _inpos (NULL), - _insize (0), - _output_stopped (false), - _outpos (NULL), - _outsize (0), - _encoder (NULL), - _sent_routing_id (false), - _received_routing_id (false) + _websocket_protocol (false) { - // Put the socket into non-blocking mode. - unblock_socket (_fd); - memset (_websocket_key, 0, MAX_HEADER_VALUE_LENGTH + 1); memset (_websocket_accept, 0, MAX_HEADER_VALUE_LENGTH + 1); - int rc = _tx_msg.init (); - errno_assert (rc == 0); + _next_msg = static_cast ( + &ws_engine_t::routing_id_msg); + _process_msg = static_cast ( + &ws_engine_t::process_routing_id_msg); } zmq::ws_engine_t::~ws_engine_t () { - zmq_assert (!_plugged); - - if (_fd != retired_fd) { -#ifdef ZMQ_HAVE_WINDOWS - int rc = closesocket (_fd); - wsa_assert (rc != SOCKET_ERROR); -#else - int rc = close (_fd); - errno_assert (rc == 0); -#endif - _fd = retired_fd; - } - - int rc = _tx_msg.close (); - errno_assert (rc == 0); - - LIBZMQ_DELETE (_encoder); - LIBZMQ_DELETE (_decoder); } -void zmq::ws_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) +void zmq::ws_engine_t::plug_internal () { - zmq_assert (!_plugged); - _plugged = true; - - zmq_assert (!_session); - zmq_assert (session_); - _session = session_; - _socket = _session->get_socket (); - - // Connect to I/O threads poller object. - io_object_t::plug (io_thread_); - _handle = add_fd (_fd); - if (_client) { unsigned char nonce[16]; int *p = (int *) nonce; @@ -170,215 +122,78 @@ void zmq::ws_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) assert (size > 0 && size < WS_BUFFER_SIZE); _outpos = _write_buffer; _outsize = size; - _output_stopped = false; - set_pollout (_handle); - } else - _output_stopped = true; + set_pollout (); + } - _input_stopped = false; - set_pollin (_handle); + set_pollin (); in_event (); } - -void zmq::ws_engine_t::unplug () +int zmq::ws_engine_t::routing_id_msg (msg_t *msg_) { - zmq_assert (_plugged); - _plugged = false; - - rm_fd (_handle); - - // Disconnect from I/O threads poller object. - io_object_t::unplug (); + int rc = msg_->init_size (_options.routing_id_size); + errno_assert (rc == 0); + if (_options.routing_id_size > 0) + memcpy (msg_->data (), _options.routing_id, _options.routing_id_size); + _next_msg = &ws_engine_t::pull_msg_from_session; + return 0; } - -void zmq::ws_engine_t::terminate () +int zmq::ws_engine_t::process_routing_id_msg (msg_t *msg_) { - unplug (); - delete this; + if (_options.recv_routing_id) { + msg_->set_flags (msg_t::routing_id); + int rc = session ()->push_msg (msg_); + errno_assert (rc == 0); + } else { + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + } + + _process_msg = &ws_engine_t::push_msg_to_session; + + return 0; } -void zmq::ws_engine_t::in_event () +bool zmq::ws_engine_t::handshake () { - if (_handshaking) { - if (_client) { - if (!client_handshake ()) - return; - } else if (!server_handshake ()) - return; + bool complete; + + if (_client) + complete = client_handshake (); + else + complete = server_handshake (); + + if (complete) { + _encoder = + new (std::nothrow) ws_encoder_t (_options.out_batch_size, _client); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) + ws_decoder_t (_options.in_batch_size, _options.maxmsgsize, + _options.zero_copy, !_client); + alloc_assert (_decoder); + + socket ()->event_handshake_succeeded (_endpoint_uri_pair, 0); + + set_pollout (); } - zmq_assert (_decoder); - - // If there's no data to process in the buffer... - if (_insize == 0) { - // Retrieve the buffer and read as much data as possible. - // Note that buffer can be arbitrarily large. However, we assume - // the underlying TCP layer has fixed buffer size and thus the - // number of bytes read will be always limited. - size_t bufsize = 0; - _decoder->get_buffer (&_inpos, &bufsize); - - const int rc = tcp_read (_fd, _inpos, bufsize); - - if (rc == 0) { - // connection closed by peer - errno = EPIPE; - error (zmq::stream_engine_t::connection_error); - return; - } - if (rc == -1) { - if (errno != EAGAIN) { - error (zmq::stream_engine_t::connection_error); - return; - } - return; - } - - // Adjust input size - _insize = static_cast (rc); - // Adjust buffer size to received bytes - _decoder->resize_buffer (_insize); - } - - int rc = 0; - size_t processed = 0; - - while (_insize > 0) { - rc = _decoder->decode (_inpos, _insize, processed); - zmq_assert (processed <= _insize); - _inpos += processed; - _insize -= processed; - if (rc == 0 || rc == -1) - break; - - if (!_received_routing_id) { - _received_routing_id = true; - if (_options.recv_routing_id) - _decoder->msg ()->set_flags (msg_t::routing_id); - else { - _decoder->msg ()->close (); - _decoder->msg ()->init (); - continue; - } - } - - rc = _session->push_msg (_decoder->msg ()); - if (rc == -1) - break; - } - - // Tear down the connection if we have failed to decode input data - // or the session has rejected the message. - if (rc == -1) { - if (errno != EAGAIN) { - error (zmq::stream_engine_t::protocol_error); - return; - } - _input_stopped = true; - reset_pollin (_handle); - } - - _session->flush (); - return; -} - -void zmq::ws_engine_t::out_event () -{ - // If write buffer is empty, try to read new data from the encoder. - if (!_outsize) { - // Even when we stop polling as soon as there is no - // data to send, the poller may invoke out_event one - // more time due to 'speculative write' optimisation. - if (unlikely (_encoder == NULL)) { - zmq_assert (_handshaking); - return; - } - - _outpos = NULL; - _outsize = _encoder->encode (&_outpos, 0); - - while (_outsize < static_cast (_options.out_batch_size)) { - if (!_sent_routing_id) { - _tx_msg.close (); - int rc = _tx_msg.init_size (_options.routing_id_size); - errno_assert (rc == 0); - if (_options.routing_id_size > 0) - memcpy (_tx_msg.data (), _options.routing_id, - _options.routing_id_size); - _sent_routing_id = true; - } else if (_session->pull_msg (&_tx_msg) == -1) - break; - _encoder->load_msg (&_tx_msg); - unsigned char *bufptr = _outpos + _outsize; - size_t n = - _encoder->encode (&bufptr, _options.out_batch_size - _outsize); - zmq_assert (n > 0); - if (_outpos == NULL) - _outpos = bufptr; - _outsize += n; - } - - // If there is no data to send, stop polling for output. - if (_outsize == 0) { - _output_stopped = true; - reset_pollout (_handle); - return; - } - } - - // If there are any data to write in write buffer, write as much as - // possible to the socket. Note that amount of data to write can be - // arbitrarily large. However, we assume that underlying TCP layer has - // limited transmission buffer and thus the actual number of bytes - // written should be reasonably modest. - const int nbytes = tcp_write (_fd, _outpos, _outsize); - - // IO error has occurred. We stop waiting for output events. - // The engine is not terminated until we detect input error; - // this is necessary to prevent losing incoming messages. - if (nbytes == -1) { - _output_stopped = true; - reset_pollout (_handle); - return; - } - - _outpos += nbytes; - _outsize -= nbytes; - - // If we are still handshaking and there are no data - // to send, stop polling for output. - if (unlikely (_handshaking)) - if (_outsize == 0) { - _output_stopped = true; - reset_pollout (_handle); - } -} - -const zmq::endpoint_uri_pair_t &zmq::ws_engine_t::get_endpoint () const -{ - return _endpoint_uri_pair; -} - -void zmq::ws_engine_t::restart_output () -{ - if (likely (_output_stopped)) { - set_pollout (_handle); - _output_stopped = false; - } + return complete; } bool zmq::ws_engine_t::server_handshake () { - int nbytes = tcp_read (_fd, _read_buffer, WS_BUFFER_SIZE); + int nbytes = tcp_read (_read_buffer, WS_BUFFER_SIZE); if (nbytes == 0) { errno = EPIPE; - error (zmq::stream_engine_t::connection_error); + error (zmq::i_engine::connection_error); return false; } else if (nbytes == -1) { if (errno != EAGAIN) - error (zmq::stream_engine_t::connection_error); + error (zmq::i_engine::connection_error); return false; } @@ -573,20 +388,6 @@ bool zmq::ws_engine_t::server_handshake () if (_header_connection_upgrade && _header_upgrade_websocket && _websocket_protocol && _websocket_key[0] != '\0') { _server_handshake_state = handshake_complete; - _handshaking = false; - - // TODO: check which decoder/encoder to use according to selected protocol - _encoder = new (std::nothrow) - ws_encoder_t (_options.out_batch_size, false); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) ws_decoder_t ( - _options.in_batch_size, _options.maxmsgsize, - _options.zero_copy, true); - alloc_assert (_decoder); - - _socket->event_handshake_succeeded (_endpoint_uri_pair, - 0); const char *magic_string = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; @@ -623,16 +424,15 @@ bool zmq::ws_engine_t::server_handshake () _outpos = _write_buffer; _outsize = written; - if (_output_stopped) - restart_output (); + _inpos++; + _insize--; + + return true; } else _server_handshake_state = handshake_error; } else _server_handshake_state = handshake_error; break; - case handshake_complete: - // no more bytes are allowed after complete - _server_handshake_state = handshake_error; default: assert (false); } @@ -643,26 +443,26 @@ bool zmq::ws_engine_t::server_handshake () if (_server_handshake_state == handshake_error) { // TODO: send bad request - _socket->event_handshake_failed_protocol ( + socket ()->event_handshake_failed_protocol ( _endpoint_uri_pair, ZMQ_PROTOCOL_ERROR_WS_UNSPECIFIED); - error (zmq::stream_engine_t::protocol_error); + error (zmq::i_engine::protocol_error); return false; } } - return _server_handshake_state == handshake_complete; + return false; } bool zmq::ws_engine_t::client_handshake () { - int nbytes = tcp_read (_fd, _read_buffer, WS_BUFFER_SIZE); + int nbytes = tcp_read (_read_buffer, WS_BUFFER_SIZE); if (nbytes == 0) { errno = EPIPE; - error (zmq::stream_engine_t::connection_error); + error (zmq::i_engine::connection_error); return false; } else if (nbytes == -1) { if (errno != EAGAIN) - error (zmq::stream_engine_t::connection_error); + error (zmq::i_engine::connection_error); return false; } @@ -967,25 +767,9 @@ bool zmq::ws_engine_t::client_handshake () && _websocket_protocol && _websocket_accept[0] != '\0') { _client_handshake_state = client_handshake_complete; - _handshaking = false; - - _encoder = new (std::nothrow) - ws_encoder_t (_options.out_batch_size, true); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) ws_decoder_t ( - _options.in_batch_size, _options.maxmsgsize, - _options.zero_copy, false); - alloc_assert (_decoder); - - _socket->event_handshake_succeeded (_endpoint_uri_pair, - 0); // TODO: validate accept key - if (_output_stopped) - restart_output (); - _inpos++; _insize--; @@ -1003,10 +787,10 @@ bool zmq::ws_engine_t::client_handshake () _insize--; if (_client_handshake_state == client_handshake_error) { - _socket->event_handshake_failed_protocol ( + socket ()->event_handshake_failed_protocol ( _endpoint_uri_pair, ZMQ_PROTOCOL_ERROR_WS_UNSPECIFIED); - error (zmq::stream_engine_t::protocol_error); + error (zmq::i_engine::protocol_error); return false; } } @@ -1014,33 +798,6 @@ bool zmq::ws_engine_t::client_handshake () return false; } -void zmq::ws_engine_t::error (zmq::stream_engine_t::error_reason_t reason_) -{ - zmq_assert (_session); - - if (reason_ != zmq::stream_engine_t::protocol_error && _handshaking) { - int err = errno; - _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err); - } - - _socket->event_disconnected (_endpoint_uri_pair, _fd); - _session->flush (); - _session->engine_error (reason_); - unplug (); - delete this; -} - -bool zmq::ws_engine_t::restart_input () -{ - zmq_assert (_input_stopped); - - _input_stopped = false; - set_pollin (_handle); - in_event (); - - return true; -} - static int encode_base64 (const unsigned char *in, int in_len, char *out, int out_len) { diff --git a/src/ws_engine.hpp b/src/ws_engine.hpp index f9804f9d..26af0160 100644 --- a/src/ws_engine.hpp +++ b/src/ws_engine.hpp @@ -31,10 +31,10 @@ #define __ZMQ_WS_ENGINE_HPP_INCLUDED__ #include "io_object.hpp" -#include "i_engine.hpp" #include "address.hpp" #include "msg.hpp" -#include "stream_engine.hpp" +#include "stream_engine_base.hpp" + #define WS_BUFFER_SIZE 8192 #define MAX_HEADER_NAME_LENGTH 1024 @@ -124,7 +124,7 @@ typedef enum client_handshake_error = -1 } ws_client_handshake_state_t; -class ws_engine_t : public io_object_t, public i_engine +class ws_engine_t : public stream_engine_base_t { public: ws_engine_t (fd_t fd_, @@ -133,49 +133,19 @@ class ws_engine_t : public io_object_t, public i_engine bool client_); ~ws_engine_t (); - // i_engine interface implementation. - // Plug the engine to the session. - void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_); - - // Terminate and deallocate the engine. Note that 'detached' - // events are not fired on termination. - void terminate (); - - // This method is called by the session to signalise that more - // messages can be written to the pipe. - bool restart_input (); - - // This method is called by the session to signalise that there - // are messages to send available. - void restart_output (); - - void zap_msg_available (){}; - - void in_event (); - void out_event (); - - const endpoint_uri_pair_t &get_endpoint () const; + protected: + bool handshake (); + void plug_internal (); private: + int routing_id_msg (msg_t *msg_); + int process_routing_id_msg (msg_t *msg_); + bool client_handshake (); bool server_handshake (); - void error (zmq::stream_engine_t::error_reason_t reason_); - void unplug (); bool _client; - bool _plugged; - socket_base_t *_socket; - fd_t _fd; - session_base_t *_session; - handle_t _handle; - - options_t _options; - - // Representation of the connected endpoints. - const endpoint_uri_pair_t _endpoint_uri_pair; - - bool _handshaking; ws_client_handshake_state_t _client_handshake_state; ws_server_handshake_state_t _server_handshake_state; @@ -191,21 +161,6 @@ class ws_engine_t : public io_object_t, public i_engine bool _websocket_protocol; char _websocket_key[MAX_HEADER_VALUE_LENGTH + 1]; char _websocket_accept[MAX_HEADER_VALUE_LENGTH + 1]; - - bool _input_stopped; - i_decoder *_decoder; - unsigned char *_inpos; - size_t _insize; - - bool _output_stopped; - unsigned char *_outpos; - size_t _outsize; - i_encoder *_encoder; - - bool _sent_routing_id; - bool _received_routing_id; - - msg_t _tx_msg; }; } diff --git a/src/zmtp_engine.cpp b/src/zmtp_engine.cpp new file mode 100644 index 00000000..3b5a9e84 --- /dev/null +++ b/src/zmtp_engine.cpp @@ -0,0 +1,571 @@ +/* + Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "macros.hpp" + +#include +#include + +#ifndef ZMQ_HAVE_WINDOWS +#include +#endif + +#include +#include + +#include "zmtp_engine.hpp" +#include "io_thread.hpp" +#include "session_base.hpp" +#include "v1_encoder.hpp" +#include "v1_decoder.hpp" +#include "v2_encoder.hpp" +#include "v2_decoder.hpp" +#include "null_mechanism.hpp" +#include "plain_client.hpp" +#include "plain_server.hpp" +#include "gssapi_client.hpp" +#include "gssapi_server.hpp" +#include "curve_client.hpp" +#include "curve_server.hpp" +#include "raw_decoder.hpp" +#include "raw_encoder.hpp" +#include "config.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "likely.hpp" +#include "wire.hpp" + +zmq::zmtp_engine_t::zmtp_engine_t ( + fd_t fd_, + const options_t &options_, + const endpoint_uri_pair_t &endpoint_uri_pair_) : + stream_engine_base_t (fd_, options_, endpoint_uri_pair_), + _greeting_size (v2_greeting_size), + _greeting_bytes_read (0), + _subscription_required (false), + _heartbeat_timeout (0) +{ + _next_msg = static_cast ( + &zmtp_engine_t::routing_id_msg); + _process_msg = static_cast ( + &zmtp_engine_t::process_routing_id_msg); + + int rc = _pong_msg.init (); + errno_assert (rc == 0); + + rc = _routing_id_msg.init (); + errno_assert (rc == 0); + + if (_options.heartbeat_interval > 0) { + _heartbeat_timeout = _options.heartbeat_timeout; + if (_heartbeat_timeout == -1) + _heartbeat_timeout = _options.heartbeat_interval; + } +} + +zmq::zmtp_engine_t::~zmtp_engine_t () +{ + int rc = _routing_id_msg.close (); + errno_assert (rc == 0); +} + +void zmq::zmtp_engine_t::plug_internal () +{ + // start optional timer, to prevent handshake hanging on no input + set_handshake_timer (); + + // Send the 'length' and 'flags' fields of the routing id message. + // The 'length' field is encoded in the long format. + _outpos = _greeting_send; + _outpos[_outsize++] = UCHAR_MAX; + put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1); + _outsize += 8; + _outpos[_outsize++] = 0x7f; + + set_pollin (); + set_pollout (); + // Flush all the data that may have been already received downstream. + in_event (); +} + +// Position of the revision field in the greeting. +const size_t revision_pos = 10; + +bool zmq::zmtp_engine_t::handshake () +{ + zmq_assert (_greeting_bytes_read < _greeting_size); + // Receive the greeting. + const int rc = receive_greeting (); + if (rc == -1) + return false; + const bool unversioned = rc != 0; + + if (!(this + ->*select_handshake_fun (unversioned, + _greeting_recv[revision_pos])) ()) + return false; + + // Start polling for output if necessary. + if (_outsize == 0) + set_pollout (); + + if (_has_handshake_timer) { + cancel_timer (handshake_timer_id); + _has_handshake_timer = false; + } + + return true; +} + +int zmq::zmtp_engine_t::receive_greeting () +{ + bool unversioned = false; + while (_greeting_bytes_read < _greeting_size) { + const int n = tcp_read (_greeting_recv + _greeting_bytes_read, + _greeting_size - _greeting_bytes_read); + if (n == 0) { + errno = EPIPE; + error (connection_error); + return -1; + } + if (n == -1) { + if (errno != EAGAIN) + error (connection_error); + return -1; + } + + _greeting_bytes_read += n; + + // We have received at least one byte from the peer. + // If the first byte is not 0xff, we know that the + // peer is using unversioned protocol. + if (_greeting_recv[0] != 0xff) { + unversioned = true; + break; + } + + if (_greeting_bytes_read < signature_size) + continue; + + // Inspect the right-most bit of the 10th byte (which coincides + // with the 'flags' field if a regular message was sent). + // Zero indicates this is a header of a routing id message + // (i.e. the peer is using the unversioned protocol). + if (!(_greeting_recv[9] & 0x01)) { + unversioned = true; + break; + } + + // The peer is using versioned protocol. + receive_greeting_versioned (); + } + return unversioned ? 1 : 0; +} + +void zmq::zmtp_engine_t::receive_greeting_versioned () +{ + // Send the major version number. + if (_outpos + _outsize == _greeting_send + signature_size) { + if (_outsize == 0) + set_pollout (); + _outpos[_outsize++] = 3; // Major version number + } + + if (_greeting_bytes_read > signature_size) { + if (_outpos + _outsize == _greeting_send + signature_size + 1) { + if (_outsize == 0) + set_pollout (); + + // Use ZMTP/2.0 to talk to older peers. + if (_greeting_recv[revision_pos] == ZMTP_1_0 + || _greeting_recv[revision_pos] == ZMTP_2_0) + _outpos[_outsize++] = _options.type; + else { + _outpos[_outsize++] = 0; // Minor version number + memset (_outpos + _outsize, 0, 20); + + zmq_assert (_options.mechanism == ZMQ_NULL + || _options.mechanism == ZMQ_PLAIN + || _options.mechanism == ZMQ_CURVE + || _options.mechanism == ZMQ_GSSAPI); + + if (_options.mechanism == ZMQ_NULL) + memcpy (_outpos + _outsize, "NULL", 4); + else if (_options.mechanism == ZMQ_PLAIN) + memcpy (_outpos + _outsize, "PLAIN", 5); + else if (_options.mechanism == ZMQ_GSSAPI) + memcpy (_outpos + _outsize, "GSSAPI", 6); + else if (_options.mechanism == ZMQ_CURVE) + memcpy (_outpos + _outsize, "CURVE", 5); + _outsize += 20; + memset (_outpos + _outsize, 0, 32); + _outsize += 32; + _greeting_size = v3_greeting_size; + } + } + } +} + +zmq::zmtp_engine_t::handshake_fun_t +zmq::zmtp_engine_t::select_handshake_fun (bool unversioned, + unsigned char revision) +{ + // Is the peer using ZMTP/1.0 with no revision number? + if (unversioned) { + return &zmtp_engine_t::handshake_v1_0_unversioned; + } + switch (revision) { + case ZMTP_1_0: + return &zmtp_engine_t::handshake_v1_0; + case ZMTP_2_0: + return &zmtp_engine_t::handshake_v2_0; + default: + return &zmtp_engine_t::handshake_v3_0; + } +} + +bool zmq::zmtp_engine_t::handshake_v1_0_unversioned () +{ + // We send and receive rest of routing id message + if (session ()->zap_enabled ()) { + // reject ZMTP 1.0 connections if ZAP is enabled + error (protocol_error); + return false; + } + + _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) + v1_decoder_t (_options.in_batch_size, _options.maxmsgsize); + alloc_assert (_decoder); + + // We have already sent the message header. + // Since there is no way to tell the encoder to + // skip the message header, we simply throw that + // header data away. + const size_t header_size = + _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2; + unsigned char tmp[10], *bufferp = tmp; + + // Prepare the routing id message and load it into encoder. + // Then consume bytes we have already sent to the peer. + int rc = _routing_id_msg.close (); + zmq_assert (rc == 0); + rc = _routing_id_msg.init_size (_options.routing_id_size); + zmq_assert (rc == 0); + memcpy (_routing_id_msg.data (), _options.routing_id, + _options.routing_id_size); + _encoder->load_msg (&_routing_id_msg); + const size_t buffer_size = _encoder->encode (&bufferp, header_size); + zmq_assert (buffer_size == header_size); + + // Make sure the decoder sees the data we have already received. + _inpos = _greeting_recv; + _insize = _greeting_bytes_read; + + // To allow for interoperability with peers that do not forward + // their subscriptions, we inject a phantom subscription message + // message into the incoming message stream. + if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB) + _subscription_required = true; + + // We are sending our routing id now and the next message + // will come from the socket. + _next_msg = &zmtp_engine_t::pull_msg_from_session; + + // We are expecting routing id message. + _process_msg = static_cast ( + &zmtp_engine_t::process_routing_id_msg); + + return true; +} + +bool zmq::zmtp_engine_t::handshake_v1_0 () +{ + if (session ()->zap_enabled ()) { + // reject ZMTP 1.0 connections if ZAP is enabled + error (protocol_error); + return false; + } + + _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) + v1_decoder_t (_options.in_batch_size, _options.maxmsgsize); + alloc_assert (_decoder); + + return true; +} + +bool zmq::zmtp_engine_t::handshake_v2_0 () +{ + if (session ()->zap_enabled ()) { + // reject ZMTP 2.0 connections if ZAP is enabled + error (protocol_error); + return false; + } + + _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) v2_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); + alloc_assert (_decoder); + + return true; +} + +bool zmq::zmtp_engine_t::handshake_v3_0 () +{ + _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) v2_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); + alloc_assert (_decoder); + + if (_options.mechanism == ZMQ_NULL + && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", + 20) + == 0) { + _mechanism = new (std::nothrow) + null_mechanism_t (session (), _peer_address, _options); + alloc_assert (_mechanism); + } else if (_options.mechanism == ZMQ_PLAIN + && memcmp (_greeting_recv + 12, + "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) + == 0) { + if (_options.as_server) + _mechanism = new (std::nothrow) + plain_server_t (session (), _peer_address, _options); + else + _mechanism = + new (std::nothrow) plain_client_t (session (), _options); + alloc_assert (_mechanism); + } +#ifdef ZMQ_HAVE_CURVE + else if (_options.mechanism == ZMQ_CURVE + && memcmp (_greeting_recv + 12, + "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) + == 0) { + if (_options.as_server) + _mechanism = new (std::nothrow) + curve_server_t (session (), _peer_address, _options); + else + _mechanism = + new (std::nothrow) curve_client_t (session (), _options); + alloc_assert (_mechanism); + } +#endif +#ifdef HAVE_LIBGSSAPI_KRB5 + else if (_options.mechanism == ZMQ_GSSAPI + && memcmp (_greeting_recv + 12, + "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) + == 0) { + if (_options.as_server) + _mechanism = new (std::nothrow) + gssapi_server_t (session (), _peer_address, _options); + else + _mechanism = + new (std::nothrow) gssapi_client_t (session (), _options); + alloc_assert (_mechanism); + } +#endif + else { + socket ()->event_handshake_failed_protocol ( + session ()->get_endpoint (), + ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH); + error (protocol_error); + return false; + } + _next_msg = &zmtp_engine_t::next_handshake_command; + _process_msg = &zmtp_engine_t::process_handshake_command; + + return true; +} + +int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_) +{ + int rc = msg_->init_size (_options.routing_id_size); + errno_assert (rc == 0); + if (_options.routing_id_size > 0) + memcpy (msg_->data (), _options.routing_id, _options.routing_id_size); + _next_msg = &zmtp_engine_t::pull_msg_from_session; + return 0; +} + +int zmq::zmtp_engine_t::process_routing_id_msg (msg_t *msg_) +{ + if (_options.recv_routing_id) { + msg_->set_flags (msg_t::routing_id); + int rc = session ()->push_msg (msg_); + errno_assert (rc == 0); + } else { + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + } + + if (_subscription_required) { + msg_t subscription; + + // Inject the subscription message, so that also + // ZMQ 2.x peers receive published messages. + int rc = subscription.init_size (1); + errno_assert (rc == 0); + *static_cast (subscription.data ()) = 1; + rc = session ()->push_msg (&subscription); + errno_assert (rc == 0); + } + + _process_msg = &zmtp_engine_t::push_msg_to_session; + + return 0; +} + +int zmq::zmtp_engine_t::produce_ping_message (msg_t *msg_) +{ + // 16-bit TTL + \4PING == 7 + const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2; + zmq_assert (_mechanism != NULL); + + int rc = msg_->init_size (ping_ttl_len); + errno_assert (rc == 0); + msg_->set_flags (msg_t::command); + // Copy in the command message + memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size); + + uint16_t ttl_val = htons (_options.heartbeat_ttl); + memcpy (static_cast (msg_->data ()) + msg_t::ping_cmd_name_size, + &ttl_val, sizeof (ttl_val)); + + rc = _mechanism->encode (msg_); + _next_msg = &zmtp_engine_t::pull_and_encode; + if (!_has_timeout_timer && _heartbeat_timeout > 0) { + add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id); + _has_timeout_timer = true; + } + return rc; +} + +int zmq::zmtp_engine_t::produce_pong_message (msg_t *msg_) +{ + zmq_assert (_mechanism != NULL); + + int rc = msg_->move (_pong_msg); + errno_assert (rc == 0); + + rc = _mechanism->encode (msg_); + _next_msg = &zmtp_engine_t::pull_and_encode; + return rc; +} + +int zmq::zmtp_engine_t::process_heartbeat_message (msg_t *msg_) +{ + if (msg_->is_ping ()) { + // 16-bit TTL + \4PING == 7 + const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2; + const size_t ping_max_ctx_len = 16; + uint16_t remote_heartbeat_ttl; + + // Get the remote heartbeat TTL to setup the timer + memcpy (&remote_heartbeat_ttl, + static_cast (msg_->data ()) + + msg_t::ping_cmd_name_size, + ping_ttl_len - msg_t::ping_cmd_name_size); + remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl); + // The remote heartbeat is in 10ths of a second + // so we multiply it by 100 to get the timer interval in ms. + remote_heartbeat_ttl *= 100; + + if (!_has_ttl_timer && remote_heartbeat_ttl > 0) { + add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id); + _has_ttl_timer = true; + } + + // As per ZMTP 3.1 the PING command might contain an up to 16 bytes + // context which needs to be PONGed back, so build the pong message + // here and store it. Truncate it if it's too long. + // Given the engine goes straight to out_event, sequential PINGs will + // not be a problem. + const size_t context_len = + std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len); + const int rc = + _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len); + errno_assert (rc == 0); + _pong_msg.set_flags (msg_t::command); + memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size); + if (context_len > 0) + memcpy (static_cast (_pong_msg.data ()) + + msg_t::ping_cmd_name_size, + static_cast (msg_->data ()) + ping_ttl_len, + context_len); + + _next_msg = static_cast ( + &zmtp_engine_t::produce_pong_message); + out_event (); + } + + return 0; +} + +int zmq::zmtp_engine_t::process_command_message (msg_t *msg_) +{ + const uint8_t cmd_name_size = + *(static_cast (msg_->data ())); + const size_t ping_name_size = msg_t::ping_cmd_name_size - 1; + const size_t sub_name_size = msg_t::sub_cmd_name_size - 1; + const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1; + // Malformed command + if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size))) + return -1; + + uint8_t *cmd_name = (static_cast (msg_->data ())) + 1; + if (cmd_name_size == ping_name_size + && memcmp (cmd_name, "PING", cmd_name_size) == 0) + msg_->set_flags (zmq::msg_t::ping); + if (cmd_name_size == ping_name_size + && memcmp (cmd_name, "PONG", cmd_name_size) == 0) + msg_->set_flags (zmq::msg_t::pong); + if (cmd_name_size == sub_name_size + && memcmp (cmd_name, "SUBSCRIBE", cmd_name_size) == 0) + msg_->set_flags (zmq::msg_t::subscribe); + if (cmd_name_size == cancel_name_size + && memcmp (cmd_name, "CANCEL", cmd_name_size) == 0) + msg_->set_flags (zmq::msg_t::cancel); + + if (msg_->is_ping () || msg_->is_pong ()) + return process_heartbeat_message (msg_); + + return 0; +} diff --git a/src/zmtp_engine.hpp b/src/zmtp_engine.hpp new file mode 100644 index 00000000..1ecf39e9 --- /dev/null +++ b/src/zmtp_engine.hpp @@ -0,0 +1,133 @@ +/* + Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_ZMTP_ENGINE_HPP_INCLUDED__ +#define __ZMQ_ZMTP_ENGINE_HPP_INCLUDED__ + +#include + +#include "fd.hpp" +#include "i_engine.hpp" +#include "io_object.hpp" +#include "i_encoder.hpp" +#include "i_decoder.hpp" +#include "options.hpp" +#include "socket_base.hpp" +#include "metadata.hpp" +#include "msg.hpp" +#include "stream_engine_base.hpp" + +namespace zmq +{ +// Protocol revisions +enum +{ + ZMTP_1_0 = 0, + ZMTP_2_0 = 1 +}; + +class io_thread_t; +class session_base_t; +class mechanism_t; + +// This engine handles any socket with SOCK_STREAM semantics, +// e.g. TCP socket or an UNIX domain socket. + +class zmtp_engine_t : public stream_engine_base_t +{ + public: + zmtp_engine_t (fd_t fd_, + const options_t &options_, + const endpoint_uri_pair_t &endpoint_uri_pair_); + ~zmtp_engine_t (); + + protected: + // Detects the protocol used by the peer. + bool handshake (); + + void plug_internal (); + + int process_command_message (msg_t *msg_); + int produce_ping_message (msg_t *msg_); + int process_heartbeat_message (msg_t *msg_); + int produce_pong_message (msg_t *msg_); + + private: + // Receive the greeting from the peer. + int receive_greeting (); + void receive_greeting_versioned (); + + typedef bool (zmtp_engine_t::*handshake_fun_t) (); + static handshake_fun_t select_handshake_fun (bool unversioned, + unsigned char revision); + + bool handshake_v1_0_unversioned (); + bool handshake_v1_0 (); + bool handshake_v2_0 (); + bool handshake_v3_0 (); + + int routing_id_msg (msg_t *msg_); + int process_routing_id_msg (msg_t *msg_); + + msg_t _routing_id_msg; + + // Need to store PING payload for PONG + msg_t _pong_msg; + + static const size_t signature_size = 10; + + // Size of ZMTP/1.0 and ZMTP/2.0 greeting message + static const size_t v2_greeting_size = 12; + + // Size of ZMTP/3.0 greeting message + static const size_t v3_greeting_size = 64; + + // Expected greeting size. + size_t _greeting_size; + + // Greeting received from, and sent to peer + unsigned char _greeting_recv[v3_greeting_size]; + unsigned char _greeting_send[v3_greeting_size]; + + // Size of greeting received so far + unsigned int _greeting_bytes_read; + + // Indicates whether the engine is to inject a phantom + // subscription message into the incoming stream. + // Needed to support old peers. + bool _subscription_required; + + int _heartbeat_timeout; + + zmtp_engine_t (const zmtp_engine_t &); + const zmtp_engine_t &operator= (const zmtp_engine_t &); +}; +} + +#endif