Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Rodrigo Mosconi
2014-07-02 21:22:51 -03:00
263 changed files with 14938 additions and 7520 deletions

View File

@@ -36,6 +36,7 @@
#include <string.h>
#include <new>
#include <sstream>
#include <iostream>
#include "stream_engine.hpp"
#include "io_thread.hpp"
@@ -45,7 +46,10 @@
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
#include "null_mechanism.hpp"
#include "plain_mechanism.hpp"
#include "plain_client.hpp"
#include "plain_server.hpp"
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
#include "curve_client.hpp"
#include "curve_server.hpp"
#include "raw_decoder.hpp"
@@ -53,10 +57,11 @@
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "tcp.hpp"
#include "likely.hpp"
#include "wire.hpp"
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
const std::string &endpoint_) :
s (fd_),
inpos (NULL),
@@ -65,6 +70,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
outpos (NULL),
outsize (0),
encoder (NULL),
metadata (NULL),
handshaking (true),
greeting_size (v2_greeting_size),
greeting_bytes_read (0),
@@ -72,18 +78,19 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
options (options_),
endpoint (endpoint_),
plugged (false),
read_msg (&stream_engine_t::read_identity),
write_msg (&stream_engine_t::write_identity),
next_msg (&stream_engine_t::identity_msg),
process_msg (&stream_engine_t::process_identity_msg),
io_error (false),
subscription_required (false),
mechanism (NULL),
input_stopped (false),
output_stopped (false),
has_handshake_timer (false),
socket (NULL)
{
int rc = tx_msg.init ();
errno_assert (rc == 0);
// Put the socket into non-blocking mode.
unblock_socket (s);
@@ -145,6 +152,12 @@ zmq::stream_engine_t::~stream_engine_t ()
int rc = tx_msg.close ();
errno_assert (rc == 0);
// Drop reference to metadata and destroy it if we are
// the only user.
if (metadata != NULL)
if (metadata->drop_ref ())
delete metadata;
delete encoder;
delete decoder;
delete mechanism;
@@ -178,18 +191,21 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// disable handshaking for raw socket
handshaking = false;
read_msg = &stream_engine_t::pull_msg_from_session;
write_msg = &stream_engine_t::push_msg_to_session;
next_msg = &stream_engine_t::pull_msg_from_session;
process_msg = &stream_engine_t::push_msg_to_session;
// For raw sockets, send an initial 0-length message to the
// application so that it knows a peer has connected.
msg_t connector;
connector.init();
(this->*write_msg) (&connector);
push_msg_to_session (&connector);
connector.close();
session->flush ();
}
else {
// start optional timer, to prevent handshake hanging on no input
set_handshake_timer ();
// Send the 'length' and 'flags' fields of the identity message.
// The 'length' field is encoded in the long format.
outpos = greeting_send;
@@ -210,6 +226,12 @@ void zmq::stream_engine_t::unplug ()
zmq_assert (plugged);
plugged = false;
// Cancel all timers.
if (has_handshake_timer) {
cancel_timer (handshake_timer_id);
has_handshake_timer = false;
}
// Cancel all fd subscriptions.
if (!io_error)
rm_fd (handle);
@@ -254,14 +276,14 @@ void zmq::stream_engine_t::in_event ()
size_t bufsize = 0;
decoder->get_buffer (&inpos, &bufsize);
int const rc = read (inpos, bufsize);
const int rc = tcp_read (s, inpos, bufsize);
if (rc == 0) {
error ();
error (connection_error);
return;
}
if (rc == -1) {
if (errno != EAGAIN)
error ();
error (connection_error);
return;
}
@@ -279,7 +301,7 @@ void zmq::stream_engine_t::in_event ()
insize -= processed;
if (rc == 0 || rc == -1)
break;
rc = (this->*write_msg) (decoder->msg ());
rc = (this->*process_msg) (decoder->msg ());
if (rc == -1)
break;
}
@@ -288,7 +310,7 @@ void zmq::stream_engine_t::in_event ()
// or the session has rejected the message.
if (rc == -1) {
if (errno != EAGAIN) {
error ();
error (protocol_error);
return;
}
input_stopped = true;
@@ -317,7 +339,7 @@ void zmq::stream_engine_t::out_event ()
outsize = encoder->encode (&outpos, 0);
while (outsize < out_batch_size) {
if ((this->*read_msg) (&tx_msg) == -1)
if ((this->*next_msg) (&tx_msg) == -1)
break;
encoder->load_msg (&tx_msg);
unsigned char *bufptr = outpos + outsize;
@@ -341,7 +363,7 @@ void zmq::stream_engine_t::out_event ()
// arbitrarily large. However, we assume that underlying TCP layer has
// limited transmission buffer and thus the actual number of bytes
// written should be reasonably modest.
int nbytes = write (outpos, outsize);
const int nbytes = tcp_write (s, outpos, outsize);
// IO error has occurred. We stop waiting for output events.
// The engine is not terminated until we detect input error;
@@ -384,12 +406,12 @@ void zmq::stream_engine_t::restart_input ()
zmq_assert (session != NULL);
zmq_assert (decoder != NULL);
int rc = (this->*write_msg) (decoder->msg ());
int rc = (this->*process_msg) (decoder->msg ());
if (rc == -1) {
if (errno == EAGAIN)
session->flush ();
else
error ();
error (protocol_error);
return;
}
@@ -401,7 +423,7 @@ void zmq::stream_engine_t::restart_input ()
insize -= processed;
if (rc == 0 || rc == -1)
break;
rc = (this->*write_msg) (decoder->msg ());
rc = (this->*process_msg) (decoder->msg ());
if (rc == -1)
break;
}
@@ -409,8 +431,11 @@ void zmq::stream_engine_t::restart_input ()
if (rc == -1 && errno == EAGAIN)
session->flush ();
else
if (rc == -1 || io_error)
error ();
if (io_error)
error (connection_error);
else
if (rc == -1)
error (protocol_error);
else {
input_stopped = false;
set_pollin (handle);
@@ -427,15 +452,15 @@ bool zmq::stream_engine_t::handshake ()
zmq_assert (greeting_bytes_read < greeting_size);
// Receive the greeting.
while (greeting_bytes_read < greeting_size) {
const int n = read (greeting_recv + greeting_bytes_read,
greeting_size - greeting_bytes_read);
const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
greeting_size - greeting_bytes_read);
if (n == 0) {
error ();
error (connection_error);
return false;
}
if (n == -1) {
if (errno != EAGAIN)
error ();
error (connection_error);
return false;
}
@@ -480,7 +505,8 @@ bool zmq::stream_engine_t::handshake ()
zmq_assert (options.mechanism == ZMQ_NULL
|| options.mechanism == ZMQ_PLAIN
|| options.mechanism == ZMQ_CURVE);
|| options.mechanism == ZMQ_CURVE
|| options.mechanism == ZMQ_GSSAPI);
if (options.mechanism == ZMQ_NULL)
memcpy (outpos + outsize, "NULL", 4);
@@ -488,6 +514,10 @@ bool zmq::stream_engine_t::handshake ()
if (options.mechanism == ZMQ_PLAIN)
memcpy (outpos + outsize, "PLAIN", 5);
else
if (options.mechanism == ZMQ_GSSAPI)
memcpy (outpos + outsize, "GSSAPI", 6);
else
if (options.mechanism == ZMQ_CURVE)
memcpy (outpos + outsize, "CURVE", 5);
outsize += 20;
memset (outpos + outsize, 0, 32);
@@ -538,10 +568,10 @@ bool zmq::stream_engine_t::handshake ()
// We are sending our identity now and the next message
// will come from the socket.
read_msg = &stream_engine_t::pull_msg_from_session;
next_msg = &stream_engine_t::pull_msg_from_session;
// We are expecting identity message.
write_msg = &stream_engine_t::write_identity;
process_msg = &stream_engine_t::process_identity_msg;
}
else
if (greeting_recv [revision_pos] == ZMTP_1_0) {
@@ -577,8 +607,12 @@ bool zmq::stream_engine_t::handshake ()
}
else
if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
mechanism = new (std::nothrow)
plain_mechanism_t (session, peer_address, options);
if (options.as_server)
mechanism = new (std::nothrow)
plain_server_t (session, peer_address, options);
else
mechanism = new (std::nothrow)
plain_client_t (options);
alloc_assert (mechanism);
}
#ifdef HAVE_LIBSODIUM
@@ -591,13 +625,24 @@ bool zmq::stream_engine_t::handshake ()
mechanism = new (std::nothrow) curve_client_t (options);
alloc_assert (mechanism);
}
#endif
#ifdef HAVE_LIBGSSAPI_KRB5
else
if (memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
if (options.as_server)
mechanism = new (std::nothrow)
gssapi_server_t (session, peer_address, options);
else
mechanism = new (std::nothrow) gssapi_client_t (options);
alloc_assert (mechanism);
}
#endif
else {
error ();
error (protocol_error);
return false;
}
read_msg = &stream_engine_t::next_handshake_command;
write_msg = &stream_engine_t::process_handshake_command;
next_msg = &stream_engine_t::next_handshake_command;
process_msg = &stream_engine_t::process_handshake_command;
}
// Start polling for output if necessary.
@@ -608,20 +653,25 @@ bool zmq::stream_engine_t::handshake ()
// Switch into the normal message flow.
handshaking = false;
if (has_handshake_timer) {
cancel_timer (handshake_timer_id);
has_handshake_timer = false;
}
return true;
}
int zmq::stream_engine_t::read_identity (msg_t *msg_)
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
{
int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0);
if (options.identity_size > 0)
memcpy (msg_->data (), options.identity, options.identity_size);
read_msg = &stream_engine_t::pull_msg_from_session;
next_msg = &stream_engine_t::pull_msg_from_session;
return 0;
}
int zmq::stream_engine_t::write_identity (msg_t *msg_)
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
{
if (options.recv_identity) {
msg_->set_flags (msg_t::identity);
@@ -636,9 +686,9 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
}
if (subscription_required)
write_msg = &stream_engine_t::write_subscription_msg;
process_msg = &stream_engine_t::write_subscription_msg;
else
write_msg = &stream_engine_t::push_msg_to_session;
process_msg = &stream_engine_t::push_msg_to_session;
return 0;
}
@@ -647,14 +697,21 @@ int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
{
zmq_assert (mechanism != NULL);
const int rc = mechanism->next_handshake_command (msg_);
if (rc == 0) {
msg_->set_flags (msg_t::command);
if (mechanism->is_handshake_complete ())
mechanism_ready ();
if (mechanism->status () == mechanism_t::ready) {
mechanism_ready ();
return pull_and_encode (msg_);
}
else
if (mechanism->status () == mechanism_t::error) {
errno = EPROTO;
return -1;
}
else {
const int rc = mechanism->next_handshake_command (msg_);
if (rc == 0)
msg_->set_flags (msg_t::command);
return rc;
}
return rc;
}
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
@@ -662,8 +719,13 @@ int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
zmq_assert (mechanism != NULL);
const int rc = mechanism->process_handshake_command (msg_);
if (rc == 0) {
if (mechanism->is_handshake_complete ())
if (mechanism->status () == mechanism_t::ready)
mechanism_ready ();
else
if (mechanism->status () == mechanism_t::error) {
errno = EPROTO;
return -1;
}
if (output_stopped)
restart_output ();
}
@@ -677,7 +739,7 @@ void zmq::stream_engine_t::zap_msg_available ()
const int rc = mechanism->zap_msg_available ();
if (rc == -1) {
error ();
error (protocol_error);
return;
}
if (input_stopped)
@@ -702,8 +764,33 @@ void zmq::stream_engine_t::mechanism_ready ()
session->flush ();
}
read_msg = &stream_engine_t::pull_and_encode;
write_msg = &stream_engine_t::write_credential;
next_msg = &stream_engine_t::pull_and_encode;
process_msg = &stream_engine_t::write_credential;
// Compile metadata.
typedef metadata_t::dict_t properties_t;
properties_t properties;
properties_t::const_iterator it;
// Add ZAP properties.
const properties_t& zap_properties = mechanism->get_zap_properties ();
it = zap_properties.begin ();
while (it != zap_properties.end ()) {
properties.insert (properties_t::value_type (it->first, it->second));
it++;
}
// Add ZMTP properties.
const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
it = zmtp_properties.begin ();
while (it != zmtp_properties.end ()) {
properties.insert (properties_t::value_type (it->first, it->second));
it++;
}
zmq_assert (metadata == NULL);
if (!properties.empty ())
metadata = new (std::nothrow) metadata_t (properties);
}
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
@@ -735,7 +822,7 @@ int zmq::stream_engine_t::write_credential (msg_t *msg_)
return -1;
}
}
write_msg = &stream_engine_t::decode_and_push;
process_msg = &stream_engine_t::decode_and_push;
return decode_and_push (msg_);
}
@@ -756,9 +843,11 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
if (mechanism->decode (msg_) == -1)
return -1;
if (metadata)
msg_->set_metadata (metadata);
if (session->push_msg (msg_) == -1) {
if (errno == EAGAIN)
write_msg = &stream_engine_t::push_one_then_decode_and_push;
process_msg = &stream_engine_t::push_one_then_decode_and_push;
return -1;
}
return 0;
@@ -768,7 +857,7 @@ int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
const int rc = session->push_msg (msg_);
if (rc == 0)
write_msg = &stream_engine_t::decode_and_push;
process_msg = &stream_engine_t::decode_and_push;
return rc;
}
@@ -785,126 +874,43 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
if (rc == -1)
return -1;
write_msg = &stream_engine_t::push_msg_to_session;
process_msg = &stream_engine_t::push_msg_to_session;
return push_msg_to_session (msg_);
}
void zmq::stream_engine_t::error ()
void zmq::stream_engine_t::error (error_reason_t reason)
{
if (options.raw_sock) {
// For raw sockets, send a final 0-length message to the application
// so that it knows the peer has been disconnected.
msg_t terminator;
terminator.init();
(this->*write_msg) (&terminator);
(this->*process_msg) (&terminator);
terminator.close();
}
zmq_assert (session);
socket->event_disconnected (endpoint, s);
session->flush ();
session->engine_error ();
session->engine_error (reason);
unplug ();
delete this;
}
int zmq::stream_engine_t::write (const void *data_, size_t size_)
void zmq::stream_engine_t::set_handshake_timer ()
{
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (!has_handshake_timer);
int nbytes = send (s, (char*) data_, (int) size_, 0);
// If not a single byte can be written to the socket in non-blocking mode
// we'll get an error (this may happen during the speculative write).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
return 0;
// Signalise peer failure.
if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAEHOSTUNREACH ||
WSAGetLastError () == WSAECONNABORTED ||
WSAGetLastError () == WSAETIMEDOUT ||
WSAGetLastError () == WSAECONNRESET))
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
return nbytes;
#else
ssize_t nbytes = send (s, data_, size_, 0);
// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
return 0;
// Signalise peer failure.
if (nbytes == -1) {
errno_assert (errno != EACCES
&& errno != EBADF
&& errno != EDESTADDRREQ
&& errno != EFAULT
&& errno != EINVAL
&& errno != EISCONN
&& errno != EMSGSIZE
&& errno != ENOMEM
&& errno != ENOTSOCK
&& errno != EOPNOTSUPP);
return -1;
if (!options.raw_sock && options.handshake_ivl > 0) {
add_timer (options.handshake_ivl, handshake_timer_id);
has_handshake_timer = true;
}
return static_cast <int> (nbytes);
#endif
}
int zmq::stream_engine_t::read (void *data_, size_t size_)
void zmq::stream_engine_t::timer_event (int id_)
{
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (id_ == handshake_timer_id);
has_handshake_timer = false;
const int rc = recv (s, (char*) data_, (int) size_, 0);
// If not a single byte can be read from the socket in non-blocking mode
// we'll get an error (this may happen during the speculative read).
if (rc == SOCKET_ERROR) {
if (WSAGetLastError () == WSAEWOULDBLOCK)
errno = EAGAIN;
else {
wsa_assert (WSAGetLastError () == WSAENETDOWN
|| WSAGetLastError () == WSAENETRESET
|| WSAGetLastError () == WSAECONNABORTED
|| WSAGetLastError () == WSAETIMEDOUT
|| WSAGetLastError () == WSAECONNRESET
|| WSAGetLastError () == WSAECONNREFUSED
|| WSAGetLastError () == WSAENOTCONN);
errno = wsa_error_to_errno (WSAGetLastError ());
}
}
return rc == SOCKET_ERROR? -1: rc;
#else
const ssize_t rc = recv (s, data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not
// be able to read a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (rc == -1) {
errno_assert (errno != EBADF
&& errno != EFAULT
&& errno != EINVAL
&& errno != ENOMEM
&& errno != ENOTSOCK);
if (errno == EWOULDBLOCK || errno == EINTR)
errno = EAGAIN;
}
return static_cast <int> (rc);
#endif
// handshake timer expired before handshake completed, so engine fails
error (timeout_error);
}