2009-08-08 16:01:58 +02:00
|
|
|
/*
|
2011-10-31 16:20:30 +01:00
|
|
|
Copyright (c) 2009-2011 250bpm s.r.o.
|
2011-11-01 18:06:11 +01:00
|
|
|
Copyright (c) 2007-2009 iMatix Corporation
|
2011-11-01 13:39:54 +01:00
|
|
|
Copyright (c) 2011 VMware, Inc.
|
2011-03-02 16:30:40 +01:00
|
|
|
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
2009-08-08 16:01:58 +02:00
|
|
|
|
|
|
|
This file is part of 0MQ.
|
|
|
|
|
|
|
|
0MQ is free software; you can redistribute it and/or modify it under
|
2010-10-30 15:08:28 +02:00
|
|
|
the terms of the GNU Lesser General Public License as published by
|
2009-08-08 16:01:58 +02:00
|
|
|
the Free Software Foundation; either version 3 of the License, or
|
|
|
|
(at your option) any later version.
|
|
|
|
|
|
|
|
0MQ is distributed in the hope that it will be useful,
|
|
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
2010-10-30 15:08:28 +02:00
|
|
|
GNU Lesser General Public License for more details.
|
2009-08-08 16:01:58 +02:00
|
|
|
|
2010-10-30 15:08:28 +02:00
|
|
|
You should have received a copy of the GNU Lesser General Public License
|
2009-08-08 16:01:58 +02:00
|
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
2009-12-15 23:49:55 +01:00
|
|
|
#include <new>
|
2009-08-21 14:29:22 +02:00
|
|
|
#include <string>
|
2009-08-08 16:01:58 +02:00
|
|
|
#include <algorithm>
|
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
#include "platform.hpp"
|
|
|
|
|
|
|
|
#if defined ZMQ_HAVE_WINDOWS
|
|
|
|
#include "windows.hpp"
|
|
|
|
#if defined _MSC_VER
|
2012-03-14 16:12:28 +01:00
|
|
|
#if defined WINCE
|
|
|
|
#include <cmnintrin.h>
|
|
|
|
#else
|
2010-08-06 17:49:37 +02:00
|
|
|
#include <intrin.h>
|
|
|
|
#endif
|
2012-03-14 16:12:28 +01:00
|
|
|
#endif
|
2010-08-06 17:49:37 +02:00
|
|
|
#else
|
|
|
|
#include <unistd.h>
|
|
|
|
#endif
|
2010-05-05 14:24:54 +02:00
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
#include "socket_base.hpp"
|
2011-07-26 00:43:57 +02:00
|
|
|
#include "tcp_listener.hpp"
|
2011-07-28 13:19:55 +02:00
|
|
|
#include "ipc_listener.hpp"
|
2011-07-26 00:43:57 +02:00
|
|
|
#include "tcp_connecter.hpp"
|
2009-08-08 16:01:58 +02:00
|
|
|
#include "io_thread.hpp"
|
2011-09-15 10:00:23 +02:00
|
|
|
#include "session_base.hpp"
|
2009-08-09 11:57:21 +02:00
|
|
|
#include "config.hpp"
|
2009-08-27 10:54:28 +02:00
|
|
|
#include "pipe.hpp"
|
2009-09-04 16:02:41 +02:00
|
|
|
#include "err.hpp"
|
2010-05-05 14:24:54 +02:00
|
|
|
#include "ctx.hpp"
|
2009-09-11 17:58:37 +02:00
|
|
|
#include "platform.hpp"
|
2010-04-11 16:36:27 +02:00
|
|
|
#include "likely.hpp"
|
2011-04-21 22:27:48 +02:00
|
|
|
#include "msg.hpp"
|
2012-02-02 14:56:51 +01:00
|
|
|
#include "address.hpp"
|
|
|
|
#include "ipc_address.hpp"
|
|
|
|
#include "tcp_address.hpp"
|
2012-05-31 02:52:19 +02:00
|
|
|
#ifdef ZMQ_HAVE_OPENPGM
|
|
|
|
#include "pgm_socket.hpp"
|
|
|
|
#endif
|
2010-12-04 23:14:38 +01:00
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
#include "pair.hpp"
|
|
|
|
#include "pub.hpp"
|
|
|
|
#include "sub.hpp"
|
|
|
|
#include "req.hpp"
|
|
|
|
#include "rep.hpp"
|
|
|
|
#include "pull.hpp"
|
|
|
|
#include "push.hpp"
|
2012-03-22 17:36:19 +01:00
|
|
|
#include "dealer.hpp"
|
|
|
|
#include "router.hpp"
|
2010-12-04 23:14:38 +01:00
|
|
|
#include "xpub.hpp"
|
|
|
|
#include "xsub.hpp"
|
2009-08-08 16:01:58 +02:00
|
|
|
|
2011-04-09 09:35:34 +02:00
|
|
|
bool zmq::socket_base_t::check_tag ()
|
|
|
|
{
|
|
|
|
return tag == 0xbaddecaf;
|
|
|
|
}
|
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
|
2012-03-20 01:41:20 +01:00
|
|
|
uint32_t tid_, int sid_)
|
2010-08-06 17:49:37 +02:00
|
|
|
{
|
|
|
|
socket_base_t *s = NULL;
|
|
|
|
switch (type_) {
|
|
|
|
|
|
|
|
case ZMQ_PAIR:
|
2012-03-20 01:41:20 +01:00
|
|
|
s = new (std::nothrow) pair_t (parent_, tid_, sid_);
|
2010-08-06 17:49:37 +02:00
|
|
|
break;
|
|
|
|
case ZMQ_PUB:
|
2012-03-20 01:41:20 +01:00
|
|
|
s = new (std::nothrow) pub_t (parent_, tid_, sid_);
|
2010-08-06 17:49:37 +02:00
|
|
|
break;
|
|
|
|
case ZMQ_SUB:
|
2012-03-20 01:41:20 +01:00
|
|
|
s = new (std::nothrow) sub_t (parent_, tid_, sid_);
|
2010-08-06 17:49:37 +02:00
|
|
|
break;
|
|
|
|
case ZMQ_REQ:
|
2012-03-20 01:41:20 +01:00
|
|
|
s = new (std::nothrow) req_t (parent_, tid_, sid_);
|
2010-08-06 17:49:37 +02:00
|
|
|
break;
|
|
|
|
case ZMQ_REP:
|
2012-03-20 01:41:20 +01:00
|
|
|
s = new (std::nothrow) rep_t (parent_, tid_, sid_);
|
2010-08-06 17:49:37 +02:00
|
|
|
break;
|
2012-03-22 17:36:19 +01:00
|
|
|
case ZMQ_DEALER:
|
|
|
|
s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
|
2010-08-06 17:49:37 +02:00
|
|
|
break;
|
2012-03-22 17:36:19 +01:00
|
|
|
case ZMQ_ROUTER:
|
|
|
|
s = new (std::nothrow) router_t (parent_, tid_, sid_);
|
2010-08-06 17:49:37 +02:00
|
|
|
break;
|
|
|
|
case ZMQ_PULL:
|
2012-03-20 01:41:20 +01:00
|
|
|
s = new (std::nothrow) pull_t (parent_, tid_, sid_);
|
2010-08-06 17:49:37 +02:00
|
|
|
break;
|
|
|
|
case ZMQ_PUSH:
|
2012-03-20 01:41:20 +01:00
|
|
|
s = new (std::nothrow) push_t (parent_, tid_, sid_);
|
2010-08-06 17:49:37 +02:00
|
|
|
break;
|
2010-12-04 23:14:38 +01:00
|
|
|
case ZMQ_XPUB:
|
2012-03-20 01:41:20 +01:00
|
|
|
s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
|
2010-12-04 23:14:38 +01:00
|
|
|
break;
|
|
|
|
case ZMQ_XSUB:
|
2012-03-20 01:41:20 +01:00
|
|
|
s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
|
2011-06-20 13:36:18 +02:00
|
|
|
break;
|
2010-08-06 17:49:37 +02:00
|
|
|
default:
|
|
|
|
errno = EINVAL;
|
|
|
|
return NULL;
|
|
|
|
}
|
2011-02-22 16:23:36 +01:00
|
|
|
alloc_assert (s);
|
2010-08-06 17:49:37 +02:00
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
2012-03-20 01:41:20 +01:00
|
|
|
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
|
2010-11-05 16:38:52 +01:00
|
|
|
own_t (parent_, tid_),
|
2011-04-09 09:35:34 +02:00
|
|
|
tag (0xbaddecaf),
|
2010-09-21 09:00:46 +02:00
|
|
|
ctx_terminated (false),
|
2010-08-12 15:03:51 +02:00
|
|
|
destroyed (false),
|
2010-09-26 16:55:54 +02:00
|
|
|
last_tsc (0),
|
2009-08-27 10:54:28 +02:00
|
|
|
ticks (0),
|
2012-03-19 21:50:53 +01:00
|
|
|
rcvmore (false)
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2012-03-20 01:41:20 +01:00
|
|
|
options.socket_id = sid_;
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
zmq::socket_base_t::~socket_base_t ()
|
|
|
|
{
|
2010-09-21 09:00:46 +02:00
|
|
|
zmq_assert (destroyed);
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
2010-11-05 17:39:51 +01:00
|
|
|
zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
|
2010-08-06 17:49:37 +02:00
|
|
|
{
|
2010-11-05 17:39:51 +01:00
|
|
|
return &mailbox;
|
2010-08-06 17:49:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
void zmq::socket_base_t::stop ()
|
|
|
|
{
|
|
|
|
// Called by ctx when it is terminated (zmq_term).
|
|
|
|
// 'stop' command is sent from the threads that called zmq_term to
|
|
|
|
// the thread owning the socket. This way, blocking call in the
|
|
|
|
// owner thread can be interrupted.
|
|
|
|
send_stop ();
|
|
|
|
}
|
|
|
|
|
2010-12-13 15:40:26 +01:00
|
|
|
int zmq::socket_base_t::parse_uri (const char *uri_,
|
|
|
|
std::string &protocol_, std::string &address_)
|
|
|
|
{
|
|
|
|
zmq_assert (uri_ != NULL);
|
|
|
|
|
|
|
|
std::string uri (uri_);
|
|
|
|
std::string::size_type pos = uri.find ("://");
|
|
|
|
if (pos == std::string::npos) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
protocol_ = uri.substr (0, pos);
|
|
|
|
address_ = uri.substr (pos + 3);
|
2012-02-08 23:06:46 +01:00
|
|
|
|
2010-12-13 15:40:26 +01:00
|
|
|
if (protocol_.empty () || address_.empty ()) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2010-08-11 14:09:56 +02:00
|
|
|
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
|
|
|
|
{
|
|
|
|
// First check out whether the protcol is something we are aware of.
|
|
|
|
if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
|
2012-03-19 22:15:09 +01:00
|
|
|
protocol_ != "pgm" && protocol_ != "epgm") {
|
2010-08-11 14:09:56 +02:00
|
|
|
errno = EPROTONOSUPPORT;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
// If 0MQ is not compiled with OpenPGM, pgm and epgm transports
|
|
|
|
// are not avaialble.
|
|
|
|
#if !defined ZMQ_HAVE_OPENPGM
|
|
|
|
if (protocol_ == "pgm" || protocol_ == "epgm") {
|
|
|
|
errno = EPROTONOSUPPORT;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
// IPC transport is not available on Windows and OpenVMS.
|
|
|
|
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
|
2010-08-30 12:10:40 +02:00
|
|
|
if (protocol_ == "ipc") {
|
2010-08-11 14:09:56 +02:00
|
|
|
// Unknown protocol.
|
|
|
|
errno = EPROTONOSUPPORT;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
// Check whether socket type and transport protocol match.
|
|
|
|
// Specifically, multicast protocols can't be combined with
|
|
|
|
// bi-directional messaging patterns (socket types).
|
|
|
|
if ((protocol_ == "pgm" || protocol_ == "epgm") &&
|
2011-01-28 07:50:21 +01:00
|
|
|
options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
|
|
|
|
options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
|
2010-08-11 14:09:56 +02:00
|
|
|
errno = ENOCOMPATPROTO;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Protocol is available.
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2012-02-02 13:07:48 +01:00
|
|
|
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)
|
2010-08-06 17:49:37 +02:00
|
|
|
{
|
2011-05-23 20:30:01 +02:00
|
|
|
// First, register the pipe so that we can terminate it later on.
|
|
|
|
pipe_->set_event_sink (this);
|
|
|
|
pipes.push_back (pipe_);
|
|
|
|
|
2011-07-15 11:24:33 +02:00
|
|
|
// Let the derived socket type know about new pipe.
|
2012-02-02 13:07:48 +01:00
|
|
|
xattach_pipe (pipe_, icanhasall_);
|
2011-05-23 20:30:01 +02:00
|
|
|
|
|
|
|
// If the socket is already being closed, ask any new pipes to terminate
|
|
|
|
// straight away.
|
|
|
|
if (is_terminating ()) {
|
|
|
|
register_term_acks (1);
|
2011-05-31 14:36:51 +02:00
|
|
|
pipe_->terminate (false);
|
2011-05-23 20:30:01 +02:00
|
|
|
}
|
2010-08-06 17:49:37 +02:00
|
|
|
}
|
|
|
|
|
2009-08-21 14:29:22 +02:00
|
|
|
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
|
2009-08-09 09:24:48 +02:00
|
|
|
size_t optvallen_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
2010-09-21 09:00:46 +02:00
|
|
|
if (unlikely (ctx_terminated)) {
|
2010-04-11 16:36:27 +02:00
|
|
|
errno = ETERM;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2009-09-21 14:39:59 +02:00
|
|
|
// First, check whether specific socket type overloads the option.
|
|
|
|
int rc = xsetsockopt (option_, optval_, optvallen_);
|
|
|
|
if (rc == 0 || errno != EINVAL)
|
|
|
|
return rc;
|
|
|
|
|
|
|
|
// If the socket type doesn't support the option, pass it to
|
|
|
|
// the generic option parser.
|
|
|
|
return options.setsockopt (option_, optval_, optvallen_);
|
2009-08-09 09:24:48 +02:00
|
|
|
}
|
|
|
|
|
2010-04-09 13:04:15 +02:00
|
|
|
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
|
|
|
|
size_t *optvallen_)
|
|
|
|
{
|
2010-09-21 09:00:46 +02:00
|
|
|
if (unlikely (ctx_terminated)) {
|
2010-04-11 16:36:27 +02:00
|
|
|
errno = ETERM;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2010-04-11 10:26:47 +02:00
|
|
|
if (option_ == ZMQ_RCVMORE) {
|
2011-03-24 14:59:43 +01:00
|
|
|
if (*optvallen_ < sizeof (int)) {
|
2010-04-11 10:26:47 +02:00
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2011-03-24 14:59:43 +01:00
|
|
|
*((int*) optval_) = rcvmore ? 1 : 0;
|
|
|
|
*optvallen_ = sizeof (int);
|
2010-04-11 10:26:47 +02:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
if (option_ == ZMQ_FD) {
|
|
|
|
if (*optvallen_ < sizeof (fd_t)) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2010-11-05 17:39:51 +01:00
|
|
|
*((fd_t*) optval_) = mailbox.get_fd ();
|
2010-08-06 17:49:37 +02:00
|
|
|
*optvallen_ = sizeof (fd_t);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (option_ == ZMQ_EVENTS) {
|
2011-03-24 15:07:23 +01:00
|
|
|
if (*optvallen_ < sizeof (int)) {
|
2010-08-06 17:49:37 +02:00
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2011-06-17 12:22:02 +02:00
|
|
|
int rc = process_commands (0, false);
|
2011-01-05 16:57:51 +01:00
|
|
|
if (rc != 0 && (errno == EINTR || errno == ETERM))
|
2010-09-08 08:39:27 +02:00
|
|
|
return -1;
|
|
|
|
errno_assert (rc == 0);
|
2011-03-24 15:07:23 +01:00
|
|
|
*((int*) optval_) = 0;
|
2010-08-06 17:49:37 +02:00
|
|
|
if (has_out ())
|
2011-03-24 15:07:23 +01:00
|
|
|
*((int*) optval_) |= ZMQ_POLLOUT;
|
2010-08-06 17:49:37 +02:00
|
|
|
if (has_in ())
|
2011-03-24 15:07:23 +01:00
|
|
|
*((int*) optval_) |= ZMQ_POLLIN;
|
|
|
|
*optvallen_ = sizeof (int);
|
2010-08-06 17:49:37 +02:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2010-04-09 13:04:15 +02:00
|
|
|
return options.getsockopt (option_, optval_, optvallen_);
|
|
|
|
}
|
|
|
|
|
2009-08-09 09:24:48 +02:00
|
|
|
int zmq::socket_base_t::bind (const char *addr_)
|
|
|
|
{
|
2010-09-21 09:00:46 +02:00
|
|
|
if (unlikely (ctx_terminated)) {
|
2010-04-11 16:36:27 +02:00
|
|
|
errno = ETERM;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2012-04-21 23:39:48 +02:00
|
|
|
// Process pending commands, if any.
|
|
|
|
int rc = process_commands (0, false);
|
|
|
|
if (unlikely (rc != 0))
|
|
|
|
return -1;
|
|
|
|
|
2009-09-16 10:11:01 +02:00
|
|
|
// Parse addr_ string.
|
2010-08-11 14:09:56 +02:00
|
|
|
std::string protocol;
|
|
|
|
std::string address;
|
2012-04-21 23:39:48 +02:00
|
|
|
rc = parse_uri (addr_, protocol, address);
|
2010-12-13 15:40:26 +01:00
|
|
|
if (rc != 0)
|
|
|
|
return -1;
|
2009-08-09 11:21:47 +02:00
|
|
|
|
2010-12-13 15:40:26 +01:00
|
|
|
rc = check_protocol (protocol);
|
2010-08-11 14:09:56 +02:00
|
|
|
if (rc != 0)
|
|
|
|
return -1;
|
2010-01-15 14:11:39 +01:00
|
|
|
|
2012-03-19 22:15:09 +01:00
|
|
|
if (protocol == "inproc") {
|
2011-01-10 13:53:30 +01:00
|
|
|
endpoint_t endpoint = {this, options};
|
2012-04-18 21:42:11 +02:00
|
|
|
int rc = register_endpoint (addr_, endpoint);
|
|
|
|
if (rc == 0) {
|
2012-04-20 16:59:08 +02:00
|
|
|
// Save last endpoint URI
|
|
|
|
options.last_endpoint.assign (addr_);
|
2012-04-18 21:42:11 +02:00
|
|
|
}
|
|
|
|
return rc;
|
2011-01-10 13:53:30 +01:00
|
|
|
}
|
2010-01-15 14:11:39 +01:00
|
|
|
|
2011-07-26 18:35:40 +02:00
|
|
|
if (protocol == "pgm" || protocol == "epgm") {
|
|
|
|
// For convenience's sake, bind can be used interchageable with
|
|
|
|
// connect for PGM and EPGM transports.
|
2012-02-18 21:44:41 +01:00
|
|
|
return connect (addr_);
|
2011-07-26 18:35:40 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Remaining trasnports require to be run in an I/O thread, so at this
|
|
|
|
// point we'll choose one.
|
|
|
|
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
|
|
|
if (!io_thread) {
|
|
|
|
errno = EMTHREAD;
|
|
|
|
return -1;
|
|
|
|
}
|
2010-09-09 08:25:00 +02:00
|
|
|
|
2011-07-28 13:19:55 +02:00
|
|
|
if (protocol == "tcp") {
|
2011-07-26 00:43:57 +02:00
|
|
|
tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
|
2010-09-09 08:25:00 +02:00
|
|
|
io_thread, this, options);
|
2011-02-22 16:23:36 +01:00
|
|
|
alloc_assert (listener);
|
2011-07-28 13:46:16 +02:00
|
|
|
int rc = listener->set_address (address.c_str ());
|
2010-02-12 15:08:57 +01:00
|
|
|
if (rc != 0) {
|
|
|
|
delete listener;
|
2012-05-04 03:32:46 +02:00
|
|
|
monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno());
|
2009-09-16 10:11:01 +02:00
|
|
|
return -1;
|
2010-02-12 15:08:57 +01:00
|
|
|
}
|
2012-02-18 21:44:41 +01:00
|
|
|
|
2012-04-20 16:59:08 +02:00
|
|
|
// Save last endpoint URI
|
2012-04-18 21:42:11 +02:00
|
|
|
listener->get_address (options.last_endpoint);
|
|
|
|
|
2012-04-20 16:59:08 +02:00
|
|
|
add_endpoint (addr_, (own_t *) listener);
|
2009-09-16 10:11:01 +02:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2011-07-28 13:46:16 +02:00
|
|
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
2011-07-28 13:19:55 +02:00
|
|
|
if (protocol == "ipc") {
|
|
|
|
ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
|
|
|
|
io_thread, this, options);
|
|
|
|
alloc_assert (listener);
|
2011-07-28 13:46:16 +02:00
|
|
|
int rc = listener->set_address (address.c_str ());
|
2011-07-28 13:19:55 +02:00
|
|
|
if (rc != 0) {
|
|
|
|
delete listener;
|
2012-05-04 03:32:46 +02:00
|
|
|
monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno());
|
2011-07-26 18:35:40 +02:00
|
|
|
return -1;
|
|
|
|
}
|
2012-02-18 21:44:41 +01:00
|
|
|
|
2012-04-20 16:59:08 +02:00
|
|
|
// Save last endpoint URI
|
2012-04-18 21:42:11 +02:00
|
|
|
listener->get_address (options.last_endpoint);
|
|
|
|
|
2012-04-20 16:59:08 +02:00
|
|
|
add_endpoint (addr_, (own_t *) listener);
|
2011-07-26 18:35:40 +02:00
|
|
|
return 0;
|
2009-09-16 10:11:01 +02:00
|
|
|
}
|
2011-07-26 18:35:40 +02:00
|
|
|
#endif
|
2009-09-16 10:11:01 +02:00
|
|
|
|
2010-08-11 14:09:56 +02:00
|
|
|
zmq_assert (false);
|
2010-08-24 15:58:48 +02:00
|
|
|
return -1;
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
2009-08-09 09:24:48 +02:00
|
|
|
int zmq::socket_base_t::connect (const char *addr_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
2010-09-21 09:00:46 +02:00
|
|
|
if (unlikely (ctx_terminated)) {
|
2010-04-11 16:36:27 +02:00
|
|
|
errno = ETERM;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2012-04-21 23:39:48 +02:00
|
|
|
// Process pending commands, if any.
|
|
|
|
int rc = process_commands (0, false);
|
|
|
|
if (unlikely (rc != 0))
|
|
|
|
return -1;
|
|
|
|
|
2009-09-11 17:58:37 +02:00
|
|
|
// Parse addr_ string.
|
2010-08-11 14:09:56 +02:00
|
|
|
std::string protocol;
|
|
|
|
std::string address;
|
2012-04-21 23:39:48 +02:00
|
|
|
rc = parse_uri (addr_, protocol, address);
|
2010-12-13 15:40:26 +01:00
|
|
|
if (rc != 0)
|
|
|
|
return -1;
|
2009-09-11 17:58:37 +02:00
|
|
|
|
2010-12-13 15:40:26 +01:00
|
|
|
rc = check_protocol (protocol);
|
2010-08-11 14:09:56 +02:00
|
|
|
if (rc != 0)
|
|
|
|
return -1;
|
2009-09-11 17:58:37 +02:00
|
|
|
|
2012-03-19 22:15:09 +01:00
|
|
|
if (protocol == "inproc") {
|
2009-11-21 20:59:55 +01:00
|
|
|
|
2010-02-14 13:34:48 +01:00
|
|
|
// TODO: inproc connect is specific with respect to creating pipes
|
|
|
|
// as there's no 'reconnect' functionality implemented. Once that
|
|
|
|
// is in place we should follow generic pipe creation algorithm.
|
|
|
|
|
2011-01-10 13:53:30 +01:00
|
|
|
// Find the peer endpoint.
|
|
|
|
endpoint_t peer = find_endpoint (addr_);
|
|
|
|
if (!peer.socket)
|
2009-11-21 20:59:55 +01:00
|
|
|
return -1;
|
|
|
|
|
2011-01-10 13:53:30 +01:00
|
|
|
// The total HWM for an inproc connection should be the sum of
|
2011-03-24 12:27:06 +01:00
|
|
|
// the binder's HWM and the connector's HWM.
|
2011-03-24 16:47:33 +01:00
|
|
|
int sndhwm;
|
|
|
|
int rcvhwm;
|
|
|
|
if (options.sndhwm == 0 || peer.options.rcvhwm == 0)
|
|
|
|
sndhwm = 0;
|
2011-01-10 13:53:30 +01:00
|
|
|
else
|
2011-03-24 16:47:33 +01:00
|
|
|
sndhwm = options.sndhwm + peer.options.rcvhwm;
|
|
|
|
if (options.rcvhwm == 0 || peer.options.sndhwm == 0)
|
|
|
|
rcvhwm = 0;
|
|
|
|
else
|
|
|
|
rcvhwm = options.rcvhwm + peer.options.sndhwm;
|
2011-01-10 13:53:30 +01:00
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
// Create a bi-directional pipe to connect the peers.
|
|
|
|
object_t *parents [2] = {this, peer.socket};
|
|
|
|
pipe_t *pipes [2] = {NULL, NULL};
|
|
|
|
int hwms [2] = {sndhwm, rcvhwm};
|
2011-06-23 07:57:47 +02:00
|
|
|
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
2011-05-22 17:26:53 +02:00
|
|
|
int rc = pipepair (parents, pipes, hwms, delays);
|
|
|
|
errno_assert (rc == 0);
|
2009-11-21 20:59:55 +01:00
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
// Attach local end of the pipe to this socket object.
|
2011-07-15 11:24:33 +02:00
|
|
|
attach_pipe (pipes [0]);
|
2009-11-21 20:59:55 +01:00
|
|
|
|
2011-11-05 09:57:17 +01:00
|
|
|
// If required, send the identity of the local socket to the peer.
|
|
|
|
if (options.send_identity) {
|
|
|
|
msg_t id;
|
|
|
|
rc = id.init_size (options.identity_size);
|
2012-05-28 23:13:09 +02:00
|
|
|
errno_assert (rc == 0);
|
2011-11-05 09:57:17 +01:00
|
|
|
memcpy (id.data (), options.identity, options.identity_size);
|
|
|
|
id.set_flags (msg_t::identity);
|
|
|
|
bool written = pipes [0]->write (&id);
|
|
|
|
zmq_assert (written);
|
2012-04-05 01:01:50 +02:00
|
|
|
pipes [0]->flush ();
|
2011-11-05 09:57:17 +01:00
|
|
|
}
|
|
|
|
|
2012-04-05 15:32:45 +02:00
|
|
|
// If required, send the identity of the peer to the local socket.
|
|
|
|
if (peer.options.send_identity) {
|
|
|
|
msg_t id;
|
|
|
|
rc = id.init_size (peer.options.identity_size);
|
2012-05-28 23:13:09 +02:00
|
|
|
errno_assert (rc == 0);
|
2012-04-05 15:32:45 +02:00
|
|
|
memcpy (id.data (), peer.options.identity, peer.options.identity_size);
|
|
|
|
id.set_flags (msg_t::identity);
|
|
|
|
bool written = pipes [1]->write (&id);
|
|
|
|
zmq_assert (written);
|
|
|
|
pipes [1]->flush ();
|
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
// Attach remote end of the pipe to the peer socket. Note that peer's
|
|
|
|
// seqnum was incremented in find_endpoint function. We don't need it
|
2010-08-11 14:09:56 +02:00
|
|
|
// increased here.
|
2011-07-15 11:24:33 +02:00
|
|
|
send_bind (peer.socket, pipes [1], false);
|
2009-11-21 20:59:55 +01:00
|
|
|
|
2012-04-20 16:59:08 +02:00
|
|
|
// Save last endpoint URI
|
|
|
|
options.last_endpoint.assign (addr_);
|
2012-04-18 21:42:11 +02:00
|
|
|
|
2009-11-21 20:59:55 +01:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2010-09-09 08:25:00 +02:00
|
|
|
// Choose the I/O thread to run the session in.
|
|
|
|
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
|
|
|
if (!io_thread) {
|
|
|
|
errno = EMTHREAD;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2012-02-02 14:56:51 +01:00
|
|
|
address_t *paddr = new (std::nothrow) address_t (protocol, address);
|
2012-05-28 23:13:09 +02:00
|
|
|
alloc_assert (paddr);
|
2012-02-02 14:56:51 +01:00
|
|
|
|
|
|
|
// Resolve address (if needed by the protocol)
|
|
|
|
if (protocol == "tcp") {
|
|
|
|
paddr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
|
2012-05-28 23:13:09 +02:00
|
|
|
alloc_assert (paddr->resolved.tcp_addr);
|
2012-02-02 14:56:51 +01:00
|
|
|
int rc = paddr->resolved.tcp_addr->resolve (
|
|
|
|
address.c_str (), false, options.ipv4only ? true : false);
|
|
|
|
if (rc != 0) {
|
|
|
|
delete paddr;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
}
|
2012-02-17 23:07:52 +01:00
|
|
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
2012-02-02 14:56:51 +01:00
|
|
|
else if(protocol == "ipc") {
|
|
|
|
paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
|
2012-05-28 23:13:09 +02:00
|
|
|
alloc_assert (paddr->resolved.ipc_addr);
|
2012-02-02 14:56:51 +01:00
|
|
|
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
|
|
|
|
if (rc != 0) {
|
|
|
|
delete paddr;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
}
|
2012-05-31 02:52:19 +02:00
|
|
|
#endif
|
|
|
|
#ifdef ZMQ_HAVE_OPENPGM
|
|
|
|
if (protocol == "pgm" || protocol == "epgm") {
|
|
|
|
struct pgm_addrinfo_t *res = NULL;
|
|
|
|
uint16_t port_number = 0;
|
|
|
|
int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
|
|
|
|
if (res != NULL)
|
|
|
|
pgm_freeaddrinfo (res);
|
|
|
|
if (rc != 0 || port_number == 0)
|
|
|
|
return -1;
|
|
|
|
}
|
2012-02-17 23:07:52 +01:00
|
|
|
#endif
|
2010-08-11 14:09:56 +02:00
|
|
|
// Create session.
|
2011-09-15 10:00:23 +02:00
|
|
|
session_base_t *session = session_base_t::create (io_thread, true, this,
|
2012-02-02 14:56:51 +01:00
|
|
|
options, paddr);
|
2011-09-15 10:00:23 +02:00
|
|
|
errno_assert (session);
|
2012-06-03 23:57:47 +02:00
|
|
|
|
2012-02-02 13:07:48 +01:00
|
|
|
// PGM does not support subscription forwarding; ask for all data to be
|
|
|
|
// sent to this pipe.
|
|
|
|
bool icanhasall = false;
|
|
|
|
if (protocol == "pgm" || protocol == "epgm")
|
|
|
|
icanhasall = true;
|
2012-06-03 23:57:47 +02:00
|
|
|
|
2012-06-04 00:01:24 +02:00
|
|
|
if (options.delay_attach_on_connect != 1 || icanhasall) {
|
2012-06-03 23:05:36 +02:00
|
|
|
// Create a bi-directional pipe.
|
|
|
|
object_t *parents [2] = {this, session};
|
|
|
|
pipe_t *pipes [2] = {NULL, NULL};
|
|
|
|
int hwms [2] = {options.sndhwm, options.rcvhwm};
|
|
|
|
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
|
|
|
rc = pipepair (parents, pipes, hwms, delays);
|
|
|
|
errno_assert (rc == 0);
|
2012-02-02 13:07:48 +01:00
|
|
|
|
2012-06-03 23:05:36 +02:00
|
|
|
// Attach local end of the pipe to the socket object.
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
attach_pipe (pipes [0], icanhasall);
|
2012-06-03 23:57:47 +02:00
|
|
|
|
2012-06-03 23:05:36 +02:00
|
|
|
// Attach remote end of the pipe to the session object later on.
|
|
|
|
session->attach_pipe (pipes [1]);
|
|
|
|
}
|
2009-08-27 10:54:28 +02:00
|
|
|
|
2012-04-20 16:59:08 +02:00
|
|
|
// Save last endpoint URI
|
2012-04-18 21:42:11 +02:00
|
|
|
paddr->to_string (options.last_endpoint);
|
|
|
|
|
2012-04-20 16:59:08 +02:00
|
|
|
add_endpoint (addr_, (own_t *) session);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_)
|
|
|
|
{
|
2010-08-11 14:09:56 +02:00
|
|
|
// Activate the session. Make it a child of this socket.
|
2012-04-20 16:59:08 +02:00
|
|
|
launch_child (endpoint_);
|
2012-05-20 13:34:08 +02:00
|
|
|
endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_));
|
2012-04-20 16:59:08 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
int zmq::socket_base_t::term_endpoint (const char *addr_)
|
|
|
|
{
|
|
|
|
// Check whether the library haven't been shut down yet.
|
|
|
|
if (unlikely (ctx_terminated)) {
|
|
|
|
errno = ETERM;
|
|
|
|
return -1;
|
|
|
|
}
|
2009-09-11 17:58:37 +02:00
|
|
|
|
2012-04-21 06:12:59 +02:00
|
|
|
// Check whether endpoint address passed to the function is valid.
|
2012-04-20 16:59:08 +02:00
|
|
|
if (unlikely (!addr_)) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2012-04-21 16:56:10 +02:00
|
|
|
// Process pending commands, if any, since there could be pending unprocessed process_own()'s
|
|
|
|
// (from launch_child() for example) we're asked to terminate now.
|
|
|
|
int rc = process_commands (0, false);
|
|
|
|
if (unlikely (rc != 0))
|
|
|
|
return -1;
|
|
|
|
|
2012-04-20 16:59:08 +02:00
|
|
|
// Find the endpoints range (if any) corresponding to the addr_ string.
|
|
|
|
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
|
|
|
|
if (range.first == range.second)
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
for (endpoints_t::iterator it = range.first; it != range.second; ++it)
|
|
|
|
term_child (it->second);
|
|
|
|
endpoints.erase (range.first, range.second);
|
2010-08-11 14:09:56 +02:00
|
|
|
return 0;
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
2011-04-21 22:27:48 +02:00
|
|
|
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
2011-04-19 08:08:15 +02:00
|
|
|
// Check whether the library haven't been shut down yet.
|
2010-09-21 09:00:46 +02:00
|
|
|
if (unlikely (ctx_terminated)) {
|
2010-08-06 17:49:37 +02:00
|
|
|
errno = ETERM;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2011-04-19 08:08:15 +02:00
|
|
|
// Check whether message passed to the function is valid.
|
2011-12-16 07:04:38 +01:00
|
|
|
if (unlikely (!msg_ || !msg_->check ())) {
|
2011-04-19 08:08:15 +02:00
|
|
|
errno = EFAULT;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2010-04-11 16:36:27 +02:00
|
|
|
// Process pending commands, if any.
|
2011-06-17 12:22:02 +02:00
|
|
|
int rc = process_commands (0, true);
|
2010-09-08 08:39:27 +02:00
|
|
|
if (unlikely (rc != 0))
|
2010-04-11 16:36:27 +02:00
|
|
|
return -1;
|
|
|
|
|
2011-11-06 14:03:51 +01:00
|
|
|
// Clear any user-visible flags that are set on the message.
|
|
|
|
msg_->reset_flags (msg_t::more);
|
|
|
|
|
2011-07-17 23:31:29 +02:00
|
|
|
// At this point we impose the flags on the message.
|
2010-04-11 10:26:47 +02:00
|
|
|
if (flags_ & ZMQ_SNDMORE)
|
2011-04-21 22:27:48 +02:00
|
|
|
msg_->set_flags (msg_t::more);
|
2010-03-27 14:57:56 +01:00
|
|
|
|
2009-08-27 10:54:28 +02:00
|
|
|
// Try to send the message.
|
2010-09-08 08:39:27 +02:00
|
|
|
rc = xsend (msg_, flags_);
|
2009-09-21 14:39:59 +02:00
|
|
|
if (rc == 0)
|
|
|
|
return 0;
|
2011-06-17 12:22:02 +02:00
|
|
|
if (unlikely (errno != EAGAIN))
|
|
|
|
return -1;
|
2009-08-27 10:54:28 +02:00
|
|
|
|
2009-09-21 14:39:59 +02:00
|
|
|
// In case of non-blocking send we'll simply propagate
|
2011-06-17 12:22:02 +02:00
|
|
|
// the error - including EAGAIN - up the stack.
|
|
|
|
if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0)
|
2009-08-27 10:54:28 +02:00
|
|
|
return -1;
|
|
|
|
|
2011-06-17 12:22:02 +02:00
|
|
|
// Compute the time when the timeout should occur.
|
|
|
|
// If the timeout is infite, don't care.
|
|
|
|
int timeout = options.sndtimeo;
|
|
|
|
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
|
|
|
|
|
2009-09-21 14:39:59 +02:00
|
|
|
// Oops, we couldn't send the message. Wait for the next
|
|
|
|
// command, process it and try to send the message again.
|
2011-06-17 12:22:02 +02:00
|
|
|
// If timeout is reached in the meantime, return EAGAIN.
|
|
|
|
while (true) {
|
|
|
|
if (unlikely (process_commands (timeout, false) != 0))
|
2010-04-11 16:36:27 +02:00
|
|
|
return -1;
|
2009-09-21 14:39:59 +02:00
|
|
|
rc = xsend (msg_, flags_);
|
2011-06-17 12:22:02 +02:00
|
|
|
if (rc == 0)
|
|
|
|
break;
|
|
|
|
if (unlikely (errno != EAGAIN))
|
|
|
|
return -1;
|
|
|
|
if (timeout > 0) {
|
2011-06-20 08:11:48 +02:00
|
|
|
timeout = (int) (end - clock.now_ms ());
|
2011-06-17 12:22:02 +02:00
|
|
|
if (timeout <= 0) {
|
|
|
|
errno = EAGAIN;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
}
|
2009-09-21 14:39:59 +02:00
|
|
|
}
|
2009-08-27 10:54:28 +02:00
|
|
|
return 0;
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
2011-04-21 22:27:48 +02:00
|
|
|
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
2011-04-19 08:08:15 +02:00
|
|
|
// Check whether the library haven't been shut down yet.
|
2010-09-21 09:00:46 +02:00
|
|
|
if (unlikely (ctx_terminated)) {
|
2010-08-06 17:49:37 +02:00
|
|
|
errno = ETERM;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2011-04-19 08:08:15 +02:00
|
|
|
// Check whether message passed to the function is valid.
|
2011-12-16 07:04:38 +01:00
|
|
|
if (unlikely (!msg_ || !msg_->check ())) {
|
2011-04-19 08:08:15 +02:00
|
|
|
errno = EFAULT;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2010-02-12 12:12:49 +01:00
|
|
|
// Get the message.
|
2009-09-21 14:39:59 +02:00
|
|
|
int rc = xrecv (msg_, flags_);
|
2011-06-17 12:22:02 +02:00
|
|
|
if (unlikely (rc != 0 && errno != EAGAIN))
|
|
|
|
return -1;
|
2010-02-12 12:12:49 +01:00
|
|
|
|
|
|
|
// Once every inbound_poll_rate messages check for signals and process
|
|
|
|
// incoming commands. This happens only if we are not polling altogether
|
|
|
|
// because there are messages available all the time. If poll occurs,
|
|
|
|
// ticks is set to zero and thus we avoid this code.
|
|
|
|
//
|
|
|
|
// Note that 'recv' uses different command throttling algorithm (the one
|
|
|
|
// described above) from the one used by 'send'. This is because counting
|
2010-09-26 16:55:54 +02:00
|
|
|
// ticks is more efficient than doing RDTSC all the time.
|
2010-02-12 12:12:49 +01:00
|
|
|
if (++ticks == inbound_poll_rate) {
|
2011-06-17 12:22:02 +02:00
|
|
|
if (unlikely (process_commands (0, false) != 0))
|
2010-04-11 16:36:27 +02:00
|
|
|
return -1;
|
2010-02-12 12:12:49 +01:00
|
|
|
ticks = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we have the message, return immediately.
|
2010-04-11 10:26:47 +02:00
|
|
|
if (rc == 0) {
|
2011-07-17 23:31:29 +02:00
|
|
|
extract_flags (msg_);
|
2009-09-21 14:39:59 +02:00
|
|
|
return 0;
|
2010-04-11 10:26:47 +02:00
|
|
|
}
|
2009-09-21 14:39:59 +02:00
|
|
|
|
2009-08-27 10:54:28 +02:00
|
|
|
// If the message cannot be fetched immediately, there are two scenarios.
|
2010-08-28 10:15:03 +02:00
|
|
|
// For non-blocking recv, commands are processed in case there's an
|
|
|
|
// activate_reader command already waiting int a command pipe.
|
|
|
|
// If it's not, return EAGAIN.
|
2011-06-17 12:22:02 +02:00
|
|
|
if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
|
|
|
|
if (unlikely (process_commands (0, false) != 0))
|
2010-04-11 16:36:27 +02:00
|
|
|
return -1;
|
2009-12-28 21:29:31 +01:00
|
|
|
ticks = 0;
|
2010-06-11 08:03:34 +02:00
|
|
|
|
|
|
|
rc = xrecv (msg_, flags_);
|
2011-06-22 11:02:16 +02:00
|
|
|
if (rc < 0)
|
|
|
|
return rc;
|
2011-07-17 23:31:29 +02:00
|
|
|
extract_flags (msg_);
|
2011-06-22 11:02:16 +02:00
|
|
|
return 0;
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2011-06-17 12:22:02 +02:00
|
|
|
// Compute the time when the timeout should occur.
|
|
|
|
// If the timeout is infite, don't care.
|
|
|
|
int timeout = options.rcvtimeo;
|
|
|
|
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
|
|
|
|
|
2010-02-17 21:16:59 +01:00
|
|
|
// In blocking scenario, commands are processed over and over again until
|
|
|
|
// we are able to fetch a message.
|
2010-11-12 14:38:25 +01:00
|
|
|
bool block = (ticks != 0);
|
2011-06-17 12:22:02 +02:00
|
|
|
while (true) {
|
|
|
|
if (unlikely (process_commands (block ? timeout : 0, false) != 0))
|
2010-04-11 16:36:27 +02:00
|
|
|
return -1;
|
2010-02-17 21:16:59 +01:00
|
|
|
rc = xrecv (msg_, flags_);
|
2011-06-17 12:22:02 +02:00
|
|
|
if (rc == 0) {
|
|
|
|
ticks = 0;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
if (unlikely (errno != EAGAIN))
|
|
|
|
return -1;
|
2010-11-12 14:38:25 +01:00
|
|
|
block = true;
|
2011-06-17 12:22:02 +02:00
|
|
|
if (timeout > 0) {
|
2011-06-20 08:11:48 +02:00
|
|
|
timeout = (int) (end - clock.now_ms ());
|
2011-06-17 12:22:02 +02:00
|
|
|
if (timeout <= 0) {
|
|
|
|
errno = EAGAIN;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
}
|
2010-02-17 21:16:59 +01:00
|
|
|
}
|
2010-04-11 10:26:47 +02:00
|
|
|
|
2011-07-17 23:31:29 +02:00
|
|
|
extract_flags (msg_);
|
2010-02-17 21:16:59 +01:00
|
|
|
return 0;
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
int zmq::socket_base_t::close ()
|
|
|
|
{
|
2012-03-22 21:55:05 +01:00
|
|
|
// Mark the socket as dead
|
|
|
|
tag = 0xdeadbeef;
|
|
|
|
|
2011-02-09 15:32:15 +01:00
|
|
|
// Transfer the ownership of the socket from this application thread
|
|
|
|
// to the reaper thread which will take care of the rest of shutdown
|
|
|
|
// process.
|
|
|
|
send_reap (this);
|
2009-09-04 16:02:41 +02:00
|
|
|
|
2009-08-08 16:01:58 +02:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2009-10-01 10:56:17 +02:00
|
|
|
bool zmq::socket_base_t::has_in ()
|
|
|
|
{
|
|
|
|
return xhas_in ();
|
|
|
|
}
|
|
|
|
|
|
|
|
bool zmq::socket_base_t::has_out ()
|
|
|
|
{
|
|
|
|
return xhas_out ();
|
|
|
|
}
|
|
|
|
|
2011-02-09 22:23:21 +01:00
|
|
|
void zmq::socket_base_t::start_reaping (poller_t *poller_)
|
2009-09-02 10:22:23 +02:00
|
|
|
{
|
2011-05-23 20:30:01 +02:00
|
|
|
// Plug the socket to the reaper thread.
|
2011-02-09 22:23:21 +01:00
|
|
|
poller = poller_;
|
|
|
|
handle = poller->add_fd (mailbox.get_fd (), this);
|
|
|
|
poller->set_pollin (handle);
|
2011-05-23 20:30:01 +02:00
|
|
|
|
|
|
|
// Initialise the termination and check whether it can be deallocated
|
|
|
|
// immediately.
|
|
|
|
terminate ();
|
|
|
|
check_destroy ();
|
2010-03-01 10:13:26 +01:00
|
|
|
}
|
|
|
|
|
2011-06-17 12:22:02 +02:00
|
|
|
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2010-09-08 08:39:27 +02:00
|
|
|
int rc;
|
2010-08-06 17:49:37 +02:00
|
|
|
command_t cmd;
|
2011-06-17 12:22:02 +02:00
|
|
|
if (timeout_ != 0) {
|
|
|
|
|
|
|
|
// If we are asked to wait, simply ask mailbox to wait.
|
|
|
|
rc = mailbox.recv (&cmd, timeout_);
|
2010-06-17 11:01:18 +02:00
|
|
|
}
|
|
|
|
else {
|
2010-08-06 17:49:37 +02:00
|
|
|
|
2011-06-17 12:22:02 +02:00
|
|
|
// If we are asked not to wait, check whether we haven't processed
|
|
|
|
// commands recently, so that we can throttle the new commands.
|
|
|
|
|
2010-09-26 16:55:54 +02:00
|
|
|
// Get the CPU's tick counter. If 0, the counter is not available.
|
|
|
|
uint64_t tsc = zmq::clock_t::rdtsc ();
|
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
// Optimised version of command processing - it doesn't have to check
|
|
|
|
// for incoming commands each time. It does so only if certain time
|
|
|
|
// elapsed since last command processing. Command delay varies
|
|
|
|
// depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
|
|
|
|
// etc. The optimisation makes sense only on platforms where getting
|
|
|
|
// a timestamp is a very cheap operation (tens of nanoseconds).
|
2010-09-26 16:55:54 +02:00
|
|
|
if (tsc && throttle_) {
|
|
|
|
|
2010-09-26 13:36:05 +02:00
|
|
|
// Check whether TSC haven't jumped backwards (in case of migration
|
|
|
|
// between CPU cores) and whether certain time have elapsed since
|
|
|
|
// last command processing. If it didn't do nothing.
|
2010-09-26 16:55:54 +02:00
|
|
|
if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
|
2010-09-08 08:39:27 +02:00
|
|
|
return 0;
|
2010-09-26 16:55:54 +02:00
|
|
|
last_tsc = tsc;
|
2010-08-06 17:49:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Check whether there are any commands pending for this thread.
|
2011-06-17 12:22:02 +02:00
|
|
|
rc = mailbox.recv (&cmd, 0);
|
2010-06-17 11:01:18 +02:00
|
|
|
}
|
2009-08-27 10:54:28 +02:00
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
// Process all the commands available at the moment.
|
2010-09-08 08:39:27 +02:00
|
|
|
while (true) {
|
|
|
|
if (rc == -1 && errno == EAGAIN)
|
|
|
|
break;
|
|
|
|
if (rc == -1 && errno == EINTR)
|
|
|
|
return -1;
|
|
|
|
errno_assert (rc == 0);
|
2010-08-06 17:49:37 +02:00
|
|
|
cmd.destination->process_command (cmd);
|
2011-06-17 12:22:02 +02:00
|
|
|
rc = mailbox.recv (&cmd, 0);
|
2010-09-08 08:39:27 +02:00
|
|
|
}
|
|
|
|
|
2010-09-21 09:00:46 +02:00
|
|
|
if (ctx_terminated) {
|
2010-09-08 08:39:27 +02:00
|
|
|
errno = ETERM;
|
|
|
|
return -1;
|
2010-08-06 17:49:37 +02:00
|
|
|
}
|
2010-09-08 08:39:27 +02:00
|
|
|
|
|
|
|
return 0;
|
2009-08-28 16:51:46 +02:00
|
|
|
}
|
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
void zmq::socket_base_t::process_stop ()
|
2009-08-28 16:51:46 +02:00
|
|
|
{
|
2010-08-06 17:49:37 +02:00
|
|
|
// Here, someone have called zmq_term while the socket was still alive.
|
2010-09-21 09:00:46 +02:00
|
|
|
// We'll remember the fact so that any blocking call is interrupted and any
|
2010-08-06 17:49:37 +02:00
|
|
|
// further attempt to use the socket will return ETERM. The user is still
|
|
|
|
// responsible for calling zmq_close on the socket though!
|
2010-09-21 09:00:46 +02:00
|
|
|
ctx_terminated = true;
|
2009-09-02 16:16:25 +02:00
|
|
|
}
|
|
|
|
|
2011-07-15 11:24:33 +02:00
|
|
|
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2011-07-15 11:24:33 +02:00
|
|
|
attach_pipe (pipe_);
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2010-10-16 10:53:29 +02:00
|
|
|
void zmq::socket_base_t::process_term (int linger_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
2010-08-11 14:09:56 +02:00
|
|
|
// Unregister all inproc endpoints associated with this socket.
|
|
|
|
// Doing this we make sure that no new pipes from other sockets (inproc)
|
|
|
|
// will be initiated.
|
|
|
|
unregister_endpoints (this);
|
|
|
|
|
2011-05-23 20:30:01 +02:00
|
|
|
// Ask all attached pipes to terminate.
|
|
|
|
for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
|
2011-05-31 14:36:51 +02:00
|
|
|
pipes [i]->terminate (false);
|
2011-09-01 07:26:17 +02:00
|
|
|
register_term_acks ((int) pipes.size ());
|
2011-05-23 20:30:01 +02:00
|
|
|
|
2010-08-11 14:09:56 +02:00
|
|
|
// Continue the termination process immediately.
|
2010-10-16 10:53:29 +02:00
|
|
|
own_t::process_term (linger_);
|
2009-12-02 21:26:47 +01:00
|
|
|
}
|
|
|
|
|
2010-08-12 15:03:51 +02:00
|
|
|
void zmq::socket_base_t::process_destroy ()
|
|
|
|
{
|
|
|
|
destroyed = true;
|
|
|
|
}
|
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
|
|
|
|
size_t optvallen_)
|
|
|
|
{
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool zmq::socket_base_t::xhas_out ()
|
|
|
|
{
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2011-05-30 10:07:34 +02:00
|
|
|
int zmq::socket_base_t::xsend (msg_t *msg_, int flags_)
|
2010-08-06 17:49:37 +02:00
|
|
|
{
|
|
|
|
errno = ENOTSUP;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool zmq::socket_base_t::xhas_in ()
|
|
|
|
{
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2011-05-30 10:07:34 +02:00
|
|
|
int zmq::socket_base_t::xrecv (msg_t *msg_, int flags_)
|
2010-08-06 17:49:37 +02:00
|
|
|
{
|
|
|
|
errno = ENOTSUP;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2011-05-23 20:30:01 +02:00
|
|
|
void zmq::socket_base_t::xread_activated (pipe_t *pipe_)
|
|
|
|
{
|
|
|
|
zmq_assert (false);
|
|
|
|
}
|
|
|
|
void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_)
|
|
|
|
{
|
|
|
|
zmq_assert (false);
|
|
|
|
}
|
|
|
|
|
2011-05-30 10:07:34 +02:00
|
|
|
void zmq::socket_base_t::xhiccuped (pipe_t *pipe_)
|
|
|
|
{
|
2012-06-12 15:45:36 +02:00
|
|
|
zmq_assert ( options.delay_attach_on_connect == 1 );
|
2011-05-30 10:07:34 +02:00
|
|
|
}
|
|
|
|
|
2011-02-09 22:23:21 +01:00
|
|
|
void zmq::socket_base_t::in_event ()
|
|
|
|
{
|
2011-06-17 12:22:02 +02:00
|
|
|
// This function is invoked only once the socket is running in the context
|
|
|
|
// of the reaper thread. Process any commands from other threads/sockets
|
|
|
|
// that may be available at the moment. Ultimately, the socket will
|
|
|
|
// be destroyed.
|
|
|
|
process_commands (0, false);
|
2011-02-25 08:58:01 +01:00
|
|
|
check_destroy ();
|
|
|
|
}
|
|
|
|
|
|
|
|
void zmq::socket_base_t::out_event ()
|
|
|
|
{
|
|
|
|
zmq_assert (false);
|
|
|
|
}
|
|
|
|
|
|
|
|
void zmq::socket_base_t::timer_event (int id_)
|
|
|
|
{
|
|
|
|
zmq_assert (false);
|
|
|
|
}
|
2011-02-09 22:23:21 +01:00
|
|
|
|
2011-02-25 08:58:01 +01:00
|
|
|
void zmq::socket_base_t::check_destroy ()
|
|
|
|
{
|
2011-02-09 22:23:21 +01:00
|
|
|
// If the object was already marked as destroyed, finish the deallocation.
|
|
|
|
if (destroyed) {
|
|
|
|
|
|
|
|
// Remove the socket from the reaper's poller.
|
|
|
|
poller->rm_fd (handle);
|
|
|
|
|
|
|
|
// Remove the socket from the context.
|
|
|
|
destroy_socket (this);
|
|
|
|
|
|
|
|
// Notify the reaper about the fact.
|
|
|
|
send_reaped ();
|
|
|
|
|
|
|
|
// Deallocate.
|
|
|
|
own_t::process_destroy ();
|
|
|
|
}
|
|
|
|
}
|
2011-05-23 20:30:01 +02:00
|
|
|
|
|
|
|
void zmq::socket_base_t::read_activated (pipe_t *pipe_)
|
|
|
|
{
|
|
|
|
xread_activated (pipe_);
|
|
|
|
}
|
|
|
|
|
|
|
|
void zmq::socket_base_t::write_activated (pipe_t *pipe_)
|
|
|
|
{
|
|
|
|
xwrite_activated (pipe_);
|
|
|
|
}
|
|
|
|
|
2011-05-30 10:07:34 +02:00
|
|
|
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
|
|
|
|
{
|
2012-06-12 15:46:01 +02:00
|
|
|
if( options.delay_attach_on_connect == 1 ) {
|
2012-06-12 15:43:18 +02:00
|
|
|
pipe_->terminate (false);
|
2012-06-12 15:46:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Notify derived sockets of the hiccup
|
|
|
|
xhiccuped (pipe_);
|
2011-05-30 10:07:34 +02:00
|
|
|
}
|
|
|
|
|
2011-05-23 20:30:01 +02:00
|
|
|
void zmq::socket_base_t::terminated (pipe_t *pipe_)
|
|
|
|
{
|
|
|
|
// Notify the specific socket type about the pipe termination.
|
|
|
|
xterminated (pipe_);
|
|
|
|
|
|
|
|
// Remove the pipe from the list of attached pipes and confirm its
|
|
|
|
// termination if we are already shutting down.
|
|
|
|
pipes.erase (pipe_);
|
|
|
|
if (is_terminating ())
|
|
|
|
unregister_term_ack ();
|
|
|
|
}
|
|
|
|
|
2011-07-17 23:31:29 +02:00
|
|
|
void zmq::socket_base_t::extract_flags (msg_t *msg_)
|
|
|
|
{
|
2011-11-04 08:00:47 +01:00
|
|
|
// Test whether IDENTITY flag is valid for this socket type.
|
2011-11-06 14:03:51 +01:00
|
|
|
if (unlikely (msg_->flags () & msg_t::identity))
|
2011-11-04 08:00:47 +01:00
|
|
|
zmq_assert (options.recv_identity);
|
|
|
|
|
|
|
|
// Remove MORE flag.
|
2011-07-17 23:31:29 +02:00
|
|
|
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
|
|
|
}
|
2012-05-04 03:32:46 +02:00
|
|
|
|
|
|
|
void zmq::socket_base_t::monitor_event (int event_, ...)
|
|
|
|
{
|
2012-05-21 21:47:11 +02:00
|
|
|
va_list args;
|
|
|
|
va_start (args, event_);
|
|
|
|
get_ctx ()->monitor_event (this, event_, args);
|
|
|
|
va_end (args);
|
2012-05-04 03:35:22 +02:00
|
|
|
}
|