mirror of
				https://github.com/zeromq/libzmq.git
				synced 2025-11-03 19:40:39 +01:00 
			
		
		
		
	Problem: term "identity" is confusing
Solution: replace by "routing id"
This commit is contained in:
		@@ -299,7 +299,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
 | 
			
		||||
 | 
			
		||||
/*  Socket options.                                                           */
 | 
			
		||||
#define ZMQ_AFFINITY 4
 | 
			
		||||
#define ZMQ_IDENTITY 5
 | 
			
		||||
#define ZMQ_ROUTING_ID 5
 | 
			
		||||
#define ZMQ_SUBSCRIBE 6
 | 
			
		||||
#define ZMQ_UNSUBSCRIBE 7
 | 
			
		||||
#define ZMQ_RATE 8
 | 
			
		||||
@@ -390,6 +390,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
 | 
			
		||||
#define ZMQ_GROUP_MAX_LENGTH        15
 | 
			
		||||
 | 
			
		||||
/*  Deprecated options and aliases                                            */
 | 
			
		||||
#define ZMQ_IDENTITY                ZMQ_ROUTING_ID
 | 
			
		||||
#define ZMQ_TCP_ACCEPT_FILTER       38
 | 
			
		||||
#define ZMQ_IPC_FILTER_PID          58
 | 
			
		||||
#define ZMQ_IPC_FILTER_UID          59
 | 
			
		||||
@@ -620,6 +621,7 @@ ZMQ_EXPORT int zmq_msg_set_group(zmq_msg_t *msg, const char *group);
 | 
			
		||||
ZMQ_EXPORT const char *zmq_msg_group(zmq_msg_t *msg);
 | 
			
		||||
 | 
			
		||||
/*  DRAFT Msg property names.                                                 */
 | 
			
		||||
// TODO the name of the define AND its value are now inconsistent with the new term "routing id"
 | 
			
		||||
#define ZMQ_MSG_PROPERTY_IDENTITY      "Identity"
 | 
			
		||||
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE   "Socket-Type"
 | 
			
		||||
#define ZMQ_MSG_PROPERTY_USER_ID       "User-Id"
 | 
			
		||||
@@ -662,8 +664,8 @@ ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
 | 
			
		||||
                                          const void *identity,
 | 
			
		||||
                                          size_t identity_size);
 | 
			
		||||
                                          const void *routing_id,
 | 
			
		||||
                                          size_t routing_id_size);
 | 
			
		||||
 | 
			
		||||
/******************************************************************************/
 | 
			
		||||
/*  Scheduling timers                                                         */
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										16
									
								
								src/ctx.cpp
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								src/ctx.cpp
									
									
									
									
									
								
							@@ -529,7 +529,7 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
 | 
			
		||||
    bind_socket_->inc_seqnum();
 | 
			
		||||
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
 | 
			
		||||
 | 
			
		||||
    if (!bind_options.recv_identity) {
 | 
			
		||||
    if (!bind_options.recv_routing_id) {
 | 
			
		||||
        msg_t msg;
 | 
			
		||||
        const bool ok = pending_connection_.bind_pipe->read (&msg);
 | 
			
		||||
        zmq_assert (ok);
 | 
			
		||||
@@ -569,16 +569,16 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
 | 
			
		||||
    // When a ctx is terminated all pending inproc connection will be
 | 
			
		||||
    // connected, but the socket will already be closed and the pipe will be
 | 
			
		||||
    // in waiting_for_delimiter state, which means no more writes can be done
 | 
			
		||||
    // and the identity write fails and causes an assert. Check if the socket
 | 
			
		||||
    // and the routing id write fails and causes an assert. Check if the socket
 | 
			
		||||
    // is open before sending.
 | 
			
		||||
    if (pending_connection_.endpoint.options.recv_identity &&
 | 
			
		||||
    if (pending_connection_.endpoint.options.recv_routing_id &&
 | 
			
		||||
            pending_connection_.endpoint.socket->check_tag ()) {
 | 
			
		||||
        msg_t id;
 | 
			
		||||
        const int rc = id.init_size (bind_options.identity_size);
 | 
			
		||||
        msg_t routing_id;
 | 
			
		||||
        const int rc = routing_id.init_size (bind_options.routing_id_size);
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
        memcpy (id.data (), bind_options.identity, bind_options.identity_size);
 | 
			
		||||
        id.set_flags (msg_t::identity);
 | 
			
		||||
        const bool written = pending_connection_.bind_pipe->write (&id);
 | 
			
		||||
        memcpy (routing_id.data (), bind_options.routing_id, bind_options.routing_id_size);
 | 
			
		||||
        routing_id.set_flags (msg_t::routing_id);
 | 
			
		||||
        const bool written = pending_connection_.bind_pipe->write (&routing_id);
 | 
			
		||||
        zmq_assert (written);
 | 
			
		||||
        pending_connection_.bind_pipe->flush ();
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -46,17 +46,17 @@ zmq::mechanism_t::~mechanism_t ()
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::mechanism_t::set_peer_identity (const void *id_ptr, size_t id_size)
 | 
			
		||||
void zmq::mechanism_t::set_peer_routing_id (const void *id_ptr, size_t id_size)
 | 
			
		||||
{
 | 
			
		||||
    identity = blob_t (static_cast <const unsigned char*> (id_ptr), id_size);
 | 
			
		||||
    routing_id = blob_t (static_cast <const unsigned char*> (id_ptr), id_size);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::mechanism_t::peer_identity (msg_t *msg_)
 | 
			
		||||
void zmq::mechanism_t::peer_routing_id (msg_t *msg_)
 | 
			
		||||
{
 | 
			
		||||
    const int rc = msg_->init_size (identity.size ());
 | 
			
		||||
    const int rc = msg_->init_size (routing_id.size ());
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
    memcpy (msg_->data (), identity.data (), identity.size ());
 | 
			
		||||
    msg_->set_flags (msg_t::identity);
 | 
			
		||||
    memcpy (msg_->data (), routing_id.data (), routing_id.size ());
 | 
			
		||||
    msg_->set_flags (msg_t::routing_id);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::mechanism_t::set_user_id (const void *data_, size_t size_)
 | 
			
		||||
@@ -136,8 +136,8 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf,
 | 
			
		||||
    if (options.type == ZMQ_REQ || options.type == ZMQ_DEALER
 | 
			
		||||
        || options.type == ZMQ_ROUTER)
 | 
			
		||||
        ptr += add_property (ptr, buf_capacity - (ptr - buf),
 | 
			
		||||
                             ZMQ_MSG_PROPERTY_IDENTITY, options.identity,
 | 
			
		||||
                             options.identity_size);
 | 
			
		||||
                             ZMQ_MSG_PROPERTY_IDENTITY, options.routing_id,
 | 
			
		||||
                             options.routing_id_size);
 | 
			
		||||
 | 
			
		||||
    return ptr - buf;
 | 
			
		||||
}
 | 
			
		||||
@@ -149,7 +149,7 @@ size_t zmq::mechanism_t::basic_properties_len() const
 | 
			
		||||
           + ((options.type == ZMQ_REQ || options.type == ZMQ_DEALER
 | 
			
		||||
               || options.type == ZMQ_ROUTER)
 | 
			
		||||
                ? property_len (ZMQ_MSG_PROPERTY_IDENTITY,
 | 
			
		||||
                                options.identity_size)
 | 
			
		||||
                                options.routing_id_size)
 | 
			
		||||
                : 0);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -199,8 +199,8 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_,
 | 
			
		||||
        ptr_ += value_length;
 | 
			
		||||
        bytes_left -= value_length;
 | 
			
		||||
 | 
			
		||||
        if (name == ZMQ_MSG_PROPERTY_IDENTITY && options.recv_identity)
 | 
			
		||||
            set_peer_identity (value, value_length);
 | 
			
		||||
        if (name == ZMQ_MSG_PROPERTY_IDENTITY && options.recv_routing_id)
 | 
			
		||||
            set_peer_routing_id (value, value_length);
 | 
			
		||||
        else
 | 
			
		||||
        if (name == ZMQ_MSG_PROPERTY_SOCKET_TYPE) {
 | 
			
		||||
            const std::string socket_type ((char *) value, value_length);
 | 
			
		||||
 
 | 
			
		||||
@@ -74,9 +74,9 @@ namespace zmq
 | 
			
		||||
        //  Returns the status of this mechanism.
 | 
			
		||||
        virtual status_t status () const = 0;
 | 
			
		||||
 | 
			
		||||
        void set_peer_identity (const void *id_ptr, size_t id_size);
 | 
			
		||||
        void set_peer_routing_id (const void *id_ptr, size_t id_size);
 | 
			
		||||
 | 
			
		||||
        void peer_identity (msg_t *msg_);
 | 
			
		||||
        void peer_routing_id (msg_t *msg_);
 | 
			
		||||
 | 
			
		||||
        void set_user_id (const void *user_id, size_t size);
 | 
			
		||||
 | 
			
		||||
@@ -138,7 +138,7 @@ namespace zmq
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
 | 
			
		||||
        blob_t identity;
 | 
			
		||||
        blob_t routing_id;
 | 
			
		||||
 | 
			
		||||
        blob_t user_id;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -413,9 +413,9 @@ void zmq::msg_t::reset_metadata ()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool zmq::msg_t::is_identity () const
 | 
			
		||||
bool zmq::msg_t::is_routing_id () const
 | 
			
		||||
{
 | 
			
		||||
    return (u.base.flags & identity) == identity;
 | 
			
		||||
    return (u.base.flags & routing_id) == routing_id;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool zmq::msg_t::is_credential () const
 | 
			
		||||
 
 | 
			
		||||
@@ -79,7 +79,7 @@ namespace zmq
 | 
			
		||||
            more = 1,           //  Followed by more parts
 | 
			
		||||
            command = 2,        //  Command frame (see ZMTP spec)
 | 
			
		||||
            credential = 32,
 | 
			
		||||
            identity = 64,
 | 
			
		||||
            routing_id = 64,
 | 
			
		||||
            shared = 128
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
@@ -109,7 +109,7 @@ namespace zmq
 | 
			
		||||
        metadata_t *metadata () const;
 | 
			
		||||
        void set_metadata (metadata_t *metadata_);
 | 
			
		||||
        void reset_metadata ();
 | 
			
		||||
        bool is_identity () const;
 | 
			
		||||
        bool is_routing_id () const;
 | 
			
		||||
        bool is_credential () const;
 | 
			
		||||
        bool is_delimiter () const;
 | 
			
		||||
        bool is_join () const;
 | 
			
		||||
 
 | 
			
		||||
@@ -48,7 +48,7 @@ zmq::options_t::options_t () :
 | 
			
		||||
    sndhwm (1000),
 | 
			
		||||
    rcvhwm (1000),
 | 
			
		||||
    affinity (0),
 | 
			
		||||
    identity_size (0),
 | 
			
		||||
    routing_id_size (0),
 | 
			
		||||
    rate (100),
 | 
			
		||||
    recovery_ivl (10000),
 | 
			
		||||
    multicast_hops (1),
 | 
			
		||||
@@ -70,7 +70,7 @@ zmq::options_t::options_t () :
 | 
			
		||||
    immediate (0),
 | 
			
		||||
    filter (false),
 | 
			
		||||
    invert_matching(false),
 | 
			
		||||
    recv_identity (false),
 | 
			
		||||
    recv_routing_id (false),
 | 
			
		||||
    raw_socket (false),
 | 
			
		||||
    raw_notify (true),
 | 
			
		||||
    tcp_keepalive (-1),
 | 
			
		||||
@@ -166,11 +166,11 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
 | 
			
		||||
            }
 | 
			
		||||
            break;
 | 
			
		||||
 | 
			
		||||
        case ZMQ_IDENTITY:
 | 
			
		||||
            //  Identity is any binary string from 1 to 255 octets
 | 
			
		||||
        case ZMQ_ROUTING_ID:
 | 
			
		||||
            //  Routing id is any binary string from 1 to 255 octets
 | 
			
		||||
            if (optvallen_ > 0 && optvallen_ < 256) {
 | 
			
		||||
                identity_size = (unsigned char) optvallen_;
 | 
			
		||||
                memcpy (identity, optval_, identity_size);
 | 
			
		||||
                routing_id_size = (unsigned char) optvallen_;
 | 
			
		||||
                memcpy (routing_id, optval_, routing_id_size);
 | 
			
		||||
                return 0;
 | 
			
		||||
            }
 | 
			
		||||
            break;
 | 
			
		||||
@@ -679,10 +679,10 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
 | 
			
		||||
            }
 | 
			
		||||
            break;
 | 
			
		||||
 | 
			
		||||
        case ZMQ_IDENTITY:
 | 
			
		||||
            if (*optvallen_ >= identity_size) {
 | 
			
		||||
                memcpy (optval_, identity, identity_size);
 | 
			
		||||
                *optvallen_ = identity_size;
 | 
			
		||||
        case ZMQ_ROUTING_ID:
 | 
			
		||||
            if (*optvallen_ >= routing_id_size) {
 | 
			
		||||
                memcpy (optval_, routing_id, routing_id_size);
 | 
			
		||||
                *optvallen_ = routing_id_size;
 | 
			
		||||
                return 0;
 | 
			
		||||
            }
 | 
			
		||||
            break;
 | 
			
		||||
 
 | 
			
		||||
@@ -70,9 +70,9 @@ namespace zmq
 | 
			
		||||
        //  I/O thread affinity.
 | 
			
		||||
        uint64_t affinity;
 | 
			
		||||
 | 
			
		||||
        //  Socket identity
 | 
			
		||||
        unsigned char identity_size;
 | 
			
		||||
        unsigned char identity [256];
 | 
			
		||||
        //  Socket routing id.
 | 
			
		||||
        unsigned char routing_id_size;
 | 
			
		||||
        unsigned char routing_id [256];
 | 
			
		||||
 | 
			
		||||
        //  Maximum transfer rate [kb/s]. Default 100kb/s.
 | 
			
		||||
        int rate;
 | 
			
		||||
@@ -144,7 +144,7 @@ namespace zmq
 | 
			
		||||
        bool invert_matching;
 | 
			
		||||
 | 
			
		||||
        //  If true, the identity message is forwarded to the socket.
 | 
			
		||||
        bool recv_identity;
 | 
			
		||||
        bool recv_routing_id;
 | 
			
		||||
 | 
			
		||||
        // if true, router socket accepts non-zmq tcp connections
 | 
			
		||||
        bool raw_socket;
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										32
									
								
								src/pipe.cpp
									
									
									
									
									
								
							
							
						
						
									
										32
									
								
								src/pipe.cpp
									
									
									
									
									
								
							@@ -92,7 +92,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
 | 
			
		||||
    sink (NULL),
 | 
			
		||||
    state (active),
 | 
			
		||||
    delay (true),
 | 
			
		||||
    routing_id(0),
 | 
			
		||||
    integral_routing_id(0),
 | 
			
		||||
    conflate (conflate_)
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
@@ -115,26 +115,26 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
 | 
			
		||||
    sink = sink_;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
 | 
			
		||||
void zmq::pipe_t::set_integral_routing_id (uint32_t integral_routing_id_)
 | 
			
		||||
{
 | 
			
		||||
    integral_routing_id = integral_routing_id_;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
uint32_t zmq::pipe_t::get_integral_routing_id ()
 | 
			
		||||
{
 | 
			
		||||
    return integral_routing_id;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::pipe_t::set_routing_id (const blob_t &routing_id_)
 | 
			
		||||
{
 | 
			
		||||
    routing_id = routing_id_;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
uint32_t zmq::pipe_t::get_routing_id ()
 | 
			
		||||
zmq::blob_t zmq::pipe_t::get_routing_id ()
 | 
			
		||||
{
 | 
			
		||||
    return routing_id;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::pipe_t::set_identity (const blob_t &identity_)
 | 
			
		||||
{
 | 
			
		||||
    identity = identity_;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
zmq::blob_t zmq::pipe_t::get_identity ()
 | 
			
		||||
{
 | 
			
		||||
    return identity;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
zmq::blob_t zmq::pipe_t::get_credential () const
 | 
			
		||||
{
 | 
			
		||||
    return credential;
 | 
			
		||||
@@ -194,7 +194,7 @@ read_message:
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
 | 
			
		||||
    if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
 | 
			
		||||
        msgs_read++;
 | 
			
		||||
 | 
			
		||||
    if (lwm > 0 && msgs_read % lwm == 0)
 | 
			
		||||
@@ -224,9 +224,9 @@ bool zmq::pipe_t::write (msg_t *msg_)
 | 
			
		||||
        return false;
 | 
			
		||||
 | 
			
		||||
    bool more = msg_->flags () & msg_t::more ? true : false;
 | 
			
		||||
    const bool is_identity = msg_->is_identity ();
 | 
			
		||||
    const bool is_routing_id = msg_->is_routing_id ();
 | 
			
		||||
    outpipe->write (*msg_, more);
 | 
			
		||||
    if (!more && !is_identity)
 | 
			
		||||
    if (!more && !is_routing_id)
 | 
			
		||||
        msgs_written++;
 | 
			
		||||
 | 
			
		||||
    return true;
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										12
									
								
								src/pipe.hpp
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								src/pipe.hpp
									
									
									
									
									
								
							@@ -85,12 +85,12 @@ namespace zmq
 | 
			
		||||
        void set_event_sink (i_pipe_events *sink_);
 | 
			
		||||
 | 
			
		||||
        //  Pipe endpoint can store an routing ID to be used by its clients.
 | 
			
		||||
        void set_routing_id (uint32_t routing_id_);
 | 
			
		||||
        uint32_t get_routing_id ();
 | 
			
		||||
        void set_integral_routing_id (uint32_t routing_id_);
 | 
			
		||||
        uint32_t get_integral_routing_id ();
 | 
			
		||||
 | 
			
		||||
        //  Pipe endpoint can store an opaque ID to be used by its clients.
 | 
			
		||||
        void set_identity (const blob_t &identity_);
 | 
			
		||||
        blob_t get_identity ();
 | 
			
		||||
        void set_routing_id (const blob_t &identity_);
 | 
			
		||||
        blob_t get_routing_id ();
 | 
			
		||||
 | 
			
		||||
        blob_t get_credential () const;
 | 
			
		||||
 | 
			
		||||
@@ -227,10 +227,10 @@ namespace zmq
 | 
			
		||||
        bool delay;
 | 
			
		||||
 | 
			
		||||
        //  Identity of the writer. Used uniquely by the reader side.
 | 
			
		||||
        blob_t identity;
 | 
			
		||||
        blob_t routing_id;
 | 
			
		||||
 | 
			
		||||
        //  Identity of the writer. Used uniquely by the reader side.
 | 
			
		||||
        int routing_id;
 | 
			
		||||
        int integral_routing_id;
 | 
			
		||||
 | 
			
		||||
        //  Pipe's credential.
 | 
			
		||||
        blob_t credential;
 | 
			
		||||
 
 | 
			
		||||
@@ -75,7 +75,7 @@ int zmq::req_t::xsend (msg_t *msg_)
 | 
			
		||||
        message_begins = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    //  First part of the request is the request identity.
 | 
			
		||||
    //  First part of the request is the request routing id.
 | 
			
		||||
    if (message_begins) {
 | 
			
		||||
        reply_pipe = NULL;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										106
									
								
								src/router.cpp
									
									
									
									
									
								
							
							
						
						
									
										106
									
								
								src/router.cpp
									
									
									
									
									
								
							@@ -39,13 +39,13 @@
 | 
			
		||||
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
 | 
			
		||||
    socket_base_t (parent_, tid_, sid_),
 | 
			
		||||
    prefetched (false),
 | 
			
		||||
    identity_sent (false),
 | 
			
		||||
    routing_id_sent (false),
 | 
			
		||||
    current_in (NULL),
 | 
			
		||||
    terminate_current_in (false),
 | 
			
		||||
    more_in (false),
 | 
			
		||||
    current_out (NULL),
 | 
			
		||||
    more_out (false),
 | 
			
		||||
    next_rid (generate_random ()),
 | 
			
		||||
    next_integral_routing_id (generate_random ()),
 | 
			
		||||
    mandatory (false),
 | 
			
		||||
    //  raw_socket functionality in ROUTER is deprecated
 | 
			
		||||
    raw_socket (false),
 | 
			
		||||
@@ -53,7 +53,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
 | 
			
		||||
    handover (false)
 | 
			
		||||
{
 | 
			
		||||
    options.type = ZMQ_ROUTER;
 | 
			
		||||
    options.recv_identity = true;
 | 
			
		||||
    options.recv_routing_id = true;
 | 
			
		||||
    options.raw_socket = false;
 | 
			
		||||
 | 
			
		||||
    prefetched_id.init ();
 | 
			
		||||
@@ -87,8 +87,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    bool identity_ok = identify_peer (pipe_);
 | 
			
		||||
    if (identity_ok)
 | 
			
		||||
    bool routing_id_ok = identify_peer (pipe_);
 | 
			
		||||
    if (routing_id_ok)
 | 
			
		||||
        fq.attach (pipe_);
 | 
			
		||||
    else
 | 
			
		||||
        anonymous_pipes.insert (pipe_);
 | 
			
		||||
@@ -113,7 +113,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
 | 
			
		||||
            if (is_int && value >= 0) {
 | 
			
		||||
                raw_socket = (value != 0);
 | 
			
		||||
                if (raw_socket) {
 | 
			
		||||
                    options.recv_identity = false;
 | 
			
		||||
                    options.recv_routing_id = false;
 | 
			
		||||
                    options.raw_socket = true;
 | 
			
		||||
                }
 | 
			
		||||
                return 0;
 | 
			
		||||
@@ -155,7 +155,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
 | 
			
		||||
    if (it != anonymous_pipes.end ())
 | 
			
		||||
        anonymous_pipes.erase (it);
 | 
			
		||||
    else {
 | 
			
		||||
        outpipes_t::iterator iter = outpipes.find (pipe_->get_identity ());
 | 
			
		||||
        outpipes_t::iterator iter = outpipes.find (pipe_->get_routing_id ());
 | 
			
		||||
        zmq_assert (iter != outpipes.end ());
 | 
			
		||||
        outpipes.erase (iter);
 | 
			
		||||
        fq.pipe_terminated (pipe_);
 | 
			
		||||
@@ -171,8 +171,8 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
 | 
			
		||||
    if (it == anonymous_pipes.end ())
 | 
			
		||||
        fq.activated (pipe_);
 | 
			
		||||
    else {
 | 
			
		||||
        bool identity_ok = identify_peer (pipe_);
 | 
			
		||||
        if (identity_ok) {
 | 
			
		||||
        bool routing_id_ok = identify_peer (pipe_);
 | 
			
		||||
        if (routing_id_ok) {
 | 
			
		||||
            anonymous_pipes.erase (it);
 | 
			
		||||
            fq.attach (pipe_);
 | 
			
		||||
        }
 | 
			
		||||
@@ -205,11 +205,11 @@ int zmq::router_t::xsend (msg_t *msg_)
 | 
			
		||||
 | 
			
		||||
            more_out = true;
 | 
			
		||||
 | 
			
		||||
            //  Find the pipe associated with the identity stored in the prefix.
 | 
			
		||||
            //  Find the pipe associated with the routing id stored in the prefix.
 | 
			
		||||
            //  If there's no such pipe just silently ignore the message, unless
 | 
			
		||||
            //  router_mandatory is set.
 | 
			
		||||
            blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
 | 
			
		||||
            outpipes_t::iterator it = outpipes.find (identity);
 | 
			
		||||
            blob_t routing_id ((unsigned char*) msg_->data (), msg_->size ());
 | 
			
		||||
            outpipes_t::iterator it = outpipes.find (routing_id);
 | 
			
		||||
 | 
			
		||||
            if (it != outpipes.end ()) {
 | 
			
		||||
                current_out = it->second.pipe;
 | 
			
		||||
@@ -300,10 +300,10 @@ int zmq::router_t::xsend (msg_t *msg_)
 | 
			
		||||
int zmq::router_t::xrecv (msg_t *msg_)
 | 
			
		||||
{
 | 
			
		||||
    if (prefetched) {
 | 
			
		||||
        if (!identity_sent) {
 | 
			
		||||
        if (!routing_id_sent) {
 | 
			
		||||
            int rc = msg_->move (prefetched_id);
 | 
			
		||||
            errno_assert (rc == 0);
 | 
			
		||||
            identity_sent = true;
 | 
			
		||||
            routing_id_sent = true;
 | 
			
		||||
        }
 | 
			
		||||
        else {
 | 
			
		||||
            int rc = msg_->move (prefetched_msg);
 | 
			
		||||
@@ -325,10 +325,10 @@ int zmq::router_t::xrecv (msg_t *msg_)
 | 
			
		||||
    pipe_t *pipe = NULL;
 | 
			
		||||
    int rc = fq.recvpipe (msg_, &pipe);
 | 
			
		||||
 | 
			
		||||
    //  It's possible that we receive peer's identity. That happens
 | 
			
		||||
    //  It's possible that we receive peer's routing id. That happens
 | 
			
		||||
    //  after reconnection. The current implementation assumes that
 | 
			
		||||
    //  the peer always uses the same identity.
 | 
			
		||||
    while (rc == 0 && msg_->is_identity ())
 | 
			
		||||
    //  the peer always uses the same routing id.
 | 
			
		||||
    while (rc == 0 && msg_->is_routing_id ())
 | 
			
		||||
        rc = fq.recvpipe (msg_, &pipe);
 | 
			
		||||
 | 
			
		||||
    if (rc != 0)
 | 
			
		||||
@@ -357,14 +357,14 @@ int zmq::router_t::xrecv (msg_t *msg_)
 | 
			
		||||
        prefetched = true;
 | 
			
		||||
        current_in = pipe;
 | 
			
		||||
 | 
			
		||||
        blob_t identity = pipe->get_identity ();
 | 
			
		||||
        rc = msg_->init_size (identity.size ());
 | 
			
		||||
        blob_t routing_id = pipe->get_routing_id ();
 | 
			
		||||
        rc = msg_->init_size (routing_id.size ());
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
        memcpy (msg_->data (), identity.data (), identity.size ());
 | 
			
		||||
        memcpy (msg_->data (), routing_id.data (), routing_id.size ());
 | 
			
		||||
        msg_->set_flags (msg_t::more);
 | 
			
		||||
        if (prefetched_msg.metadata())
 | 
			
		||||
            msg_->set_metadata(prefetched_msg.metadata());
 | 
			
		||||
        identity_sent = true;
 | 
			
		||||
        routing_id_sent = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return 0;
 | 
			
		||||
@@ -396,11 +396,11 @@ bool zmq::router_t::xhas_in ()
 | 
			
		||||
    pipe_t *pipe = NULL;
 | 
			
		||||
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
 | 
			
		||||
 | 
			
		||||
    //  It's possible that we receive peer's identity. That happens
 | 
			
		||||
    //  It's possible that we receive peer's routing id. That happens
 | 
			
		||||
    //  after reconnection. The current implementation assumes that
 | 
			
		||||
    //  the peer always uses the same identity.
 | 
			
		||||
    //  TODO: handle the situation when the peer changes its identity.
 | 
			
		||||
    while (rc == 0 && prefetched_msg.is_identity ())
 | 
			
		||||
    //  the peer always uses the same routing id.
 | 
			
		||||
    //  TODO: handle the situation when the peer changes its routing id.
 | 
			
		||||
    while (rc == 0 && prefetched_msg.is_routing_id ())
 | 
			
		||||
        rc = fq.recvpipe (&prefetched_msg, &pipe);
 | 
			
		||||
 | 
			
		||||
    if (rc != 0)
 | 
			
		||||
@@ -408,14 +408,14 @@ bool zmq::router_t::xhas_in ()
 | 
			
		||||
 | 
			
		||||
    zmq_assert (pipe != NULL);
 | 
			
		||||
 | 
			
		||||
    blob_t identity = pipe->get_identity ();
 | 
			
		||||
    rc = prefetched_id.init_size (identity.size ());
 | 
			
		||||
    blob_t routing_id = pipe->get_routing_id ();
 | 
			
		||||
    rc = prefetched_id.init_size (routing_id.size ());
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
    memcpy (prefetched_id.data (), identity.data (), identity.size ());
 | 
			
		||||
    memcpy (prefetched_id.data (), routing_id.data (), routing_id.size ());
 | 
			
		||||
    prefetched_id.set_flags (msg_t::more);
 | 
			
		||||
 | 
			
		||||
    prefetched = true;
 | 
			
		||||
    identity_sent = false;
 | 
			
		||||
    routing_id_sent = false;
 | 
			
		||||
    current_in = pipe;
 | 
			
		||||
 | 
			
		||||
    return true;
 | 
			
		||||
@@ -443,13 +443,13 @@ zmq::blob_t zmq::router_t::get_credential () const
 | 
			
		||||
    return fq.get_credential ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int zmq::router_t::get_peer_state (const void *identity,
 | 
			
		||||
                                   size_t identity_size) const
 | 
			
		||||
int zmq::router_t::get_peer_state (const void *routing_id_,
 | 
			
		||||
                                   size_t routing_id_size_) const
 | 
			
		||||
{
 | 
			
		||||
    int res = 0;
 | 
			
		||||
 | 
			
		||||
    blob_t identity_blob ((unsigned char *) identity, identity_size);
 | 
			
		||||
    outpipes_t::const_iterator it = outpipes.find (identity_blob);
 | 
			
		||||
    blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_);
 | 
			
		||||
    outpipes_t::const_iterator it = outpipes.find (routing_id_blob);
 | 
			
		||||
    if (it == outpipes.end ()) {
 | 
			
		||||
        errno = EHOSTUNREACH;
 | 
			
		||||
        return -1;
 | 
			
		||||
@@ -467,27 +467,27 @@ int zmq::router_t::get_peer_state (const void *identity,
 | 
			
		||||
bool zmq::router_t::identify_peer (pipe_t *pipe_)
 | 
			
		||||
{
 | 
			
		||||
    msg_t msg;
 | 
			
		||||
    blob_t identity;
 | 
			
		||||
    blob_t routing_id;
 | 
			
		||||
    bool ok;
 | 
			
		||||
 | 
			
		||||
    if (connect_rid.length()) {
 | 
			
		||||
        identity = blob_t ((unsigned char*) connect_rid.c_str (),
 | 
			
		||||
        routing_id = blob_t ((unsigned char*) connect_rid.c_str (),
 | 
			
		||||
            connect_rid.length());
 | 
			
		||||
        connect_rid.clear ();
 | 
			
		||||
        outpipes_t::iterator it = outpipes.find (identity);
 | 
			
		||||
        outpipes_t::iterator it = outpipes.find (routing_id);
 | 
			
		||||
        if (it != outpipes.end ())
 | 
			
		||||
            zmq_assert(false); //  Not allowed to duplicate an existing rid
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    if (options.raw_socket) { //  Always assign identity for raw-socket
 | 
			
		||||
    if (options.raw_socket) { //  Always assign an integral routing id for raw-socket
 | 
			
		||||
        unsigned char buf [5];
 | 
			
		||||
        buf [0] = 0;
 | 
			
		||||
        put_uint32 (buf + 1, next_rid++);
 | 
			
		||||
        identity = blob_t (buf, sizeof buf);
 | 
			
		||||
        put_uint32 (buf + 1, next_integral_routing_id++);
 | 
			
		||||
        routing_id = blob_t (buf, sizeof buf);
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    if (!options.raw_socket) {
 | 
			
		||||
        //  Pick up handshake cases and also case where next identity is set
 | 
			
		||||
        //  Pick up handshake cases and also case where next integral routing id is set
 | 
			
		||||
        msg.init ();
 | 
			
		||||
        ok = pipe_->read (&msg);
 | 
			
		||||
        if (!ok)
 | 
			
		||||
@@ -497,13 +497,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
 | 
			
		||||
            //  Fall back on the auto-generation
 | 
			
		||||
            unsigned char buf [5];
 | 
			
		||||
            buf [0] = 0;
 | 
			
		||||
            put_uint32 (buf + 1, next_rid++);
 | 
			
		||||
            identity = blob_t (buf, sizeof buf);
 | 
			
		||||
            put_uint32 (buf + 1, next_integral_routing_id++);
 | 
			
		||||
            routing_id = blob_t (buf, sizeof buf);
 | 
			
		||||
            msg.close ();
 | 
			
		||||
        }
 | 
			
		||||
        else {
 | 
			
		||||
            identity = blob_t ((unsigned char*) msg.data (), msg.size ());
 | 
			
		||||
            outpipes_t::iterator it = outpipes.find (identity);
 | 
			
		||||
            routing_id = blob_t ((unsigned char*) msg.data (), msg.size ());
 | 
			
		||||
            outpipes_t::iterator it = outpipes.find (routing_id);
 | 
			
		||||
            msg.close ();
 | 
			
		||||
 | 
			
		||||
            if (it != outpipes.end ()) {
 | 
			
		||||
@@ -512,23 +512,23 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
 | 
			
		||||
                    return false;
 | 
			
		||||
                else {
 | 
			
		||||
                    //  We will allow the new connection to take over this
 | 
			
		||||
                    //  identity. Temporarily assign a new identity to the
 | 
			
		||||
                    //  routing id. Temporarily assign a new routing id to the
 | 
			
		||||
                    //  existing pipe so we can terminate it asynchronously.
 | 
			
		||||
                    unsigned char buf [5];
 | 
			
		||||
                    buf [0] = 0;
 | 
			
		||||
                    put_uint32 (buf + 1, next_rid++);
 | 
			
		||||
                    blob_t new_identity = blob_t (buf, sizeof buf);
 | 
			
		||||
                    put_uint32 (buf + 1, next_integral_routing_id++);
 | 
			
		||||
                    blob_t new_routing_id = blob_t (buf, sizeof buf);
 | 
			
		||||
 | 
			
		||||
                    it->second.pipe->set_identity (new_identity);
 | 
			
		||||
                    it->second.pipe->set_routing_id (new_routing_id);
 | 
			
		||||
                    outpipe_t existing_outpipe =
 | 
			
		||||
                        {it->second.pipe, it->second.active};
 | 
			
		||||
 | 
			
		||||
                    ok = outpipes.insert (outpipes_t::value_type (
 | 
			
		||||
                        new_identity, existing_outpipe)).second;
 | 
			
		||||
                        new_routing_id, existing_outpipe)).second;
 | 
			
		||||
                    zmq_assert (ok);
 | 
			
		||||
 | 
			
		||||
                    //  Remove the existing identity entry to allow the new
 | 
			
		||||
                    //  connection to take the identity.
 | 
			
		||||
                    //  Remove the existing routing id entry to allow the new
 | 
			
		||||
                    //  connection to take the routing id.
 | 
			
		||||
                    outpipes.erase (it);
 | 
			
		||||
 | 
			
		||||
                    if (existing_outpipe.pipe == current_in)
 | 
			
		||||
@@ -540,10 +540,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pipe_->set_identity (identity);
 | 
			
		||||
    pipe_->set_routing_id (routing_id);
 | 
			
		||||
    //  Add the record into output pipes lookup table
 | 
			
		||||
    outpipe_t outpipe = {pipe_, true};
 | 
			
		||||
    ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
 | 
			
		||||
    ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
 | 
			
		||||
    zmq_assert (ok);
 | 
			
		||||
 | 
			
		||||
    return true;
 | 
			
		||||
 
 | 
			
		||||
@@ -85,7 +85,7 @@ namespace zmq
 | 
			
		||||
 | 
			
		||||
        //  If true, the receiver got the message part with
 | 
			
		||||
        //  the peer's identity.
 | 
			
		||||
        bool identity_sent;
 | 
			
		||||
        bool routing_id_sent;
 | 
			
		||||
 | 
			
		||||
        //  Holds the prefetched identity.
 | 
			
		||||
        msg_t prefetched_id;
 | 
			
		||||
@@ -123,7 +123,7 @@ namespace zmq
 | 
			
		||||
 | 
			
		||||
        //  Routing IDs are generated. It's a simple increment and wrap-over
 | 
			
		||||
        //  algorithm. This value is the next ID to use (if not used already).
 | 
			
		||||
        uint32_t next_rid;
 | 
			
		||||
        uint32_t next_integral_routing_id;
 | 
			
		||||
 | 
			
		||||
        // If true, report EAGAIN to the caller instead of silently dropping
 | 
			
		||||
        // the message targeting an unknown peer.
 | 
			
		||||
 
 | 
			
		||||
@@ -58,7 +58,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
 | 
			
		||||
    if (!routing_id)
 | 
			
		||||
        routing_id = next_rid++;        //  Never use RID zero
 | 
			
		||||
 | 
			
		||||
    pipe_->set_routing_id (routing_id);
 | 
			
		||||
    pipe_->set_integral_routing_id (routing_id);
 | 
			
		||||
    //  Add the record into output pipes lookup table
 | 
			
		||||
    outpipe_t outpipe = {pipe_, true};
 | 
			
		||||
    bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
 | 
			
		||||
@@ -69,7 +69,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
 | 
			
		||||
 | 
			
		||||
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
 | 
			
		||||
{
 | 
			
		||||
    outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
 | 
			
		||||
    outpipes_t::iterator it = outpipes.find (pipe_->get_integral_routing_id ());
 | 
			
		||||
    zmq_assert (it != outpipes.end ());
 | 
			
		||||
    outpipes.erase (it);
 | 
			
		||||
    fq.pipe_terminated (pipe_);
 | 
			
		||||
@@ -159,7 +159,7 @@ int zmq::server_t::xrecv (msg_t *msg_)
 | 
			
		||||
 | 
			
		||||
    zmq_assert (pipe != NULL);
 | 
			
		||||
 | 
			
		||||
    uint32_t routing_id = pipe->get_routing_id ();
 | 
			
		||||
    uint32_t routing_id = pipe->get_integral_routing_id ();
 | 
			
		||||
    msg_->set_routing_id (routing_id);
 | 
			
		||||
 | 
			
		||||
    return 0;
 | 
			
		||||
 
 | 
			
		||||
@@ -358,12 +358,12 @@ int zmq::session_base_t::zap_connect ()
 | 
			
		||||
 | 
			
		||||
    send_bind (peer.socket, new_pipes [1], false);
 | 
			
		||||
 | 
			
		||||
    //  Send empty identity if required by the peer.
 | 
			
		||||
    if (peer.options.recv_identity) {
 | 
			
		||||
    //  Send empty routing id if required by the peer.
 | 
			
		||||
    if (peer.options.recv_routing_id) {
 | 
			
		||||
        msg_t id;
 | 
			
		||||
        rc = id.init ();
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
        id.set_flags (msg_t::identity);
 | 
			
		||||
        id.set_flags (msg_t::routing_id);
 | 
			
		||||
        bool ok = zap_pipe->write (&id);
 | 
			
		||||
        zmq_assert (ok);
 | 
			
		||||
        zap_pipe->flush ();
 | 
			
		||||
 
 | 
			
		||||
@@ -221,11 +221,11 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int zmq::socket_base_t::get_peer_state (const void *identity,
 | 
			
		||||
                                        size_t identity_size) const
 | 
			
		||||
int zmq::socket_base_t::get_peer_state (const void *routing_id_,
 | 
			
		||||
                                        size_t routing_id_size_) const
 | 
			
		||||
{
 | 
			
		||||
    LIBZMQ_UNUSED (identity);
 | 
			
		||||
    LIBZMQ_UNUSED (identity_size);
 | 
			
		||||
    LIBZMQ_UNUSED (routing_id_);
 | 
			
		||||
    LIBZMQ_UNUSED (routing_id_size_);
 | 
			
		||||
 | 
			
		||||
    //  Only ROUTER sockets support this
 | 
			
		||||
    errno = ENOTSUP;
 | 
			
		||||
@@ -764,14 +764,14 @@ int zmq::socket_base_t::connect (const char *addr_)
 | 
			
		||||
 | 
			
		||||
        if (!peer.socket) {
 | 
			
		||||
            //  The peer doesn't exist yet so we don't know whether
 | 
			
		||||
            //  to send the identity message or not. To resolve this,
 | 
			
		||||
            //  we always send our identity and drop it later if
 | 
			
		||||
            //  to send the routing id message or not. To resolve this,
 | 
			
		||||
            //  we always send our routing id and drop it later if
 | 
			
		||||
            //  the peer doesn't expect it.
 | 
			
		||||
            msg_t id;
 | 
			
		||||
            rc = id.init_size (options.identity_size);
 | 
			
		||||
            rc = id.init_size (options.routing_id_size);
 | 
			
		||||
            errno_assert (rc == 0);
 | 
			
		||||
            memcpy (id.data (), options.identity, options.identity_size);
 | 
			
		||||
            id.set_flags (msg_t::identity);
 | 
			
		||||
            memcpy (id.data (), options.routing_id, options.routing_id_size);
 | 
			
		||||
            id.set_flags (msg_t::routing_id);
 | 
			
		||||
            bool written = new_pipes [0]->write (&id);
 | 
			
		||||
            zmq_assert (written);
 | 
			
		||||
            new_pipes [0]->flush ();
 | 
			
		||||
@@ -780,25 +780,25 @@ int zmq::socket_base_t::connect (const char *addr_)
 | 
			
		||||
            pend_connection (std::string (addr_), endpoint, new_pipes);
 | 
			
		||||
        }
 | 
			
		||||
        else {
 | 
			
		||||
            //  If required, send the identity of the local socket to the peer.
 | 
			
		||||
            if (peer.options.recv_identity) {
 | 
			
		||||
            //  If required, send the routing id of the local socket to the peer.
 | 
			
		||||
            if (peer.options.recv_routing_id) {
 | 
			
		||||
                msg_t id;
 | 
			
		||||
                rc = id.init_size (options.identity_size);
 | 
			
		||||
                rc = id.init_size (options.routing_id_size);
 | 
			
		||||
                errno_assert (rc == 0);
 | 
			
		||||
                memcpy (id.data (), options.identity, options.identity_size);
 | 
			
		||||
                id.set_flags (msg_t::identity);
 | 
			
		||||
                memcpy (id.data (), options.routing_id, options.routing_id_size);
 | 
			
		||||
                id.set_flags (msg_t::routing_id);
 | 
			
		||||
                bool written = new_pipes [0]->write (&id);
 | 
			
		||||
                zmq_assert (written);
 | 
			
		||||
                new_pipes [0]->flush ();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            //  If required, send the identity of the peer to the local socket.
 | 
			
		||||
            if (options.recv_identity) {
 | 
			
		||||
            //  If required, send the routing id of the peer to the local socket.
 | 
			
		||||
            if (options.recv_routing_id) {
 | 
			
		||||
                msg_t id;
 | 
			
		||||
                rc = id.init_size (peer.options.identity_size);
 | 
			
		||||
                rc = id.init_size (peer.options.routing_id_size);
 | 
			
		||||
                errno_assert (rc == 0);
 | 
			
		||||
                memcpy (id.data (), peer.options.identity, peer.options.identity_size);
 | 
			
		||||
                id.set_flags (msg_t::identity);
 | 
			
		||||
                memcpy (id.data (), peer.options.routing_id, peer.options.routing_id_size);
 | 
			
		||||
                id.set_flags (msg_t::routing_id);
 | 
			
		||||
                bool written = new_pipes [1]->write (&id);
 | 
			
		||||
                zmq_assert (written);
 | 
			
		||||
                new_pipes [1]->flush ();
 | 
			
		||||
@@ -1591,9 +1591,9 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
 | 
			
		||||
 | 
			
		||||
void zmq::socket_base_t::extract_flags (msg_t *msg_)
 | 
			
		||||
{
 | 
			
		||||
    //  Test whether IDENTITY flag is valid for this socket type.
 | 
			
		||||
    if (unlikely (msg_->flags () & msg_t::identity))
 | 
			
		||||
        zmq_assert (options.recv_identity);
 | 
			
		||||
    //  Test whether routing_id flag is valid for this socket type.
 | 
			
		||||
    if (unlikely (msg_->flags () & msg_t::routing_id))
 | 
			
		||||
        zmq_assert (options.recv_routing_id);
 | 
			
		||||
 | 
			
		||||
    //  Remove MORE flag.
 | 
			
		||||
    rcvmore = msg_->flags () & msg_t::more ? true : false;
 | 
			
		||||
 
 | 
			
		||||
@@ -39,22 +39,22 @@
 | 
			
		||||
zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
 | 
			
		||||
    socket_base_t (parent_, tid_, sid_),
 | 
			
		||||
    prefetched (false),
 | 
			
		||||
    identity_sent (false),
 | 
			
		||||
    routing_id_sent (false),
 | 
			
		||||
    current_out (NULL),
 | 
			
		||||
    more_out (false),
 | 
			
		||||
    next_rid (generate_random ())
 | 
			
		||||
    next_integral_routing_id (generate_random ())
 | 
			
		||||
{
 | 
			
		||||
    options.type = ZMQ_STREAM;
 | 
			
		||||
    options.raw_socket = true;
 | 
			
		||||
 | 
			
		||||
    prefetched_id.init ();
 | 
			
		||||
    prefetched_routing_id.init ();
 | 
			
		||||
    prefetched_msg.init ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
zmq::stream_t::~stream_t ()
 | 
			
		||||
{
 | 
			
		||||
    zmq_assert (outpipes.empty ());
 | 
			
		||||
    prefetched_id.close ();
 | 
			
		||||
    prefetched_routing_id.close ();
 | 
			
		||||
    prefetched_msg.close ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -70,7 +70,7 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
 | 
			
		||||
 | 
			
		||||
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
 | 
			
		||||
{
 | 
			
		||||
    outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
 | 
			
		||||
    outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
 | 
			
		||||
    zmq_assert (it != outpipes.end ());
 | 
			
		||||
    outpipes.erase (it);
 | 
			
		||||
    fq.pipe_terminated (pipe_);
 | 
			
		||||
@@ -107,10 +107,10 @@ int zmq::stream_t::xsend (msg_t *msg_)
 | 
			
		||||
        //  TODO: The connections should be killed instead.
 | 
			
		||||
        if (msg_->flags () & msg_t::more) {
 | 
			
		||||
 | 
			
		||||
            //  Find the pipe associated with the identity stored in the prefix.
 | 
			
		||||
            //  Find the pipe associated with the routing id stored in the prefix.
 | 
			
		||||
            //  If there's no such pipe return an error
 | 
			
		||||
            blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
 | 
			
		||||
            outpipes_t::iterator it = outpipes.find (identity);
 | 
			
		||||
            blob_t routing_id ((unsigned char*) msg_->data (), msg_->size ());
 | 
			
		||||
            outpipes_t::iterator it = outpipes.find (routing_id);
 | 
			
		||||
 | 
			
		||||
            if (it != outpipes.end ()) {
 | 
			
		||||
                current_out = it->second.pipe;
 | 
			
		||||
@@ -207,10 +207,10 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
 | 
			
		||||
int zmq::stream_t::xrecv (msg_t *msg_)
 | 
			
		||||
{
 | 
			
		||||
    if (prefetched) {
 | 
			
		||||
        if (!identity_sent) {
 | 
			
		||||
            int rc = msg_->move (prefetched_id);
 | 
			
		||||
        if (!routing_id_sent) {
 | 
			
		||||
            int rc = msg_->move (prefetched_routing_id);
 | 
			
		||||
            errno_assert (rc == 0);
 | 
			
		||||
            identity_sent = true;
 | 
			
		||||
            routing_id_sent = true;
 | 
			
		||||
        }
 | 
			
		||||
        else {
 | 
			
		||||
            int rc = msg_->move (prefetched_msg);
 | 
			
		||||
@@ -231,10 +231,10 @@ int zmq::stream_t::xrecv (msg_t *msg_)
 | 
			
		||||
    //  We have received a frame with TCP data.
 | 
			
		||||
    //  Rather than sending this frame, we keep it in prefetched
 | 
			
		||||
    //  buffer and send a frame with peer's ID.
 | 
			
		||||
    blob_t identity = pipe->get_identity ();
 | 
			
		||||
    blob_t routing_id = pipe->get_routing_id ();
 | 
			
		||||
    rc = msg_->close();
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
    rc = msg_->init_size (identity.size ());
 | 
			
		||||
    rc = msg_->init_size (routing_id.size ());
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    // forward metadata (if any)
 | 
			
		||||
@@ -242,11 +242,11 @@ int zmq::stream_t::xrecv (msg_t *msg_)
 | 
			
		||||
    if (metadata)
 | 
			
		||||
        msg_->set_metadata(metadata);
 | 
			
		||||
 | 
			
		||||
    memcpy (msg_->data (), identity.data (), identity.size ());
 | 
			
		||||
    memcpy (msg_->data (), routing_id.data (), routing_id.size ());
 | 
			
		||||
    msg_->set_flags (msg_t::more);
 | 
			
		||||
 | 
			
		||||
    prefetched = true;
 | 
			
		||||
    identity_sent = true;
 | 
			
		||||
    routing_id_sent = true;
 | 
			
		||||
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
@@ -267,20 +267,20 @@ bool zmq::stream_t::xhas_in ()
 | 
			
		||||
    zmq_assert (pipe != NULL);
 | 
			
		||||
    zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
 | 
			
		||||
 | 
			
		||||
    blob_t identity = pipe->get_identity ();
 | 
			
		||||
    rc = prefetched_id.init_size (identity.size ());
 | 
			
		||||
    blob_t routing_id = pipe->get_routing_id ();
 | 
			
		||||
    rc = prefetched_routing_id.init_size (routing_id.size ());
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    // forward metadata (if any)
 | 
			
		||||
    metadata_t *metadata = prefetched_msg.metadata();
 | 
			
		||||
    if (metadata)
 | 
			
		||||
        prefetched_id.set_metadata(metadata);
 | 
			
		||||
        prefetched_routing_id.set_metadata(metadata);
 | 
			
		||||
 | 
			
		||||
    memcpy (prefetched_id.data (), identity.data (), identity.size ());
 | 
			
		||||
    prefetched_id.set_flags (msg_t::more);
 | 
			
		||||
    memcpy (prefetched_routing_id.data (), routing_id.data (), routing_id.size ());
 | 
			
		||||
    prefetched_routing_id.set_flags (msg_t::more);
 | 
			
		||||
 | 
			
		||||
    prefetched = true;
 | 
			
		||||
    identity_sent = false;
 | 
			
		||||
    routing_id_sent = false;
 | 
			
		||||
 | 
			
		||||
    return true;
 | 
			
		||||
}
 | 
			
		||||
@@ -295,27 +295,27 @@ bool zmq::stream_t::xhas_out ()
 | 
			
		||||
 | 
			
		||||
void zmq::stream_t::identify_peer (pipe_t *pipe_)
 | 
			
		||||
{
 | 
			
		||||
    //  Always assign identity for raw-socket
 | 
			
		||||
    //  Always assign routing id for raw-socket
 | 
			
		||||
    unsigned char buffer [5];
 | 
			
		||||
    buffer [0] = 0;
 | 
			
		||||
    blob_t identity;
 | 
			
		||||
    blob_t routing_id;
 | 
			
		||||
    if (connect_rid.length ()) {
 | 
			
		||||
        identity = blob_t ((unsigned char*) connect_rid.c_str(),
 | 
			
		||||
        routing_id = blob_t ((unsigned char*) connect_rid.c_str(),
 | 
			
		||||
            connect_rid.length ());
 | 
			
		||||
        connect_rid.clear ();
 | 
			
		||||
        outpipes_t::iterator it = outpipes.find (identity);
 | 
			
		||||
        outpipes_t::iterator it = outpipes.find (routing_id);
 | 
			
		||||
        zmq_assert (it == outpipes.end ());
 | 
			
		||||
    }
 | 
			
		||||
    else {
 | 
			
		||||
        put_uint32 (buffer + 1, next_rid++);
 | 
			
		||||
        identity = blob_t (buffer, sizeof buffer);
 | 
			
		||||
        memcpy (options.identity, identity.data (), identity.size ());
 | 
			
		||||
        options.identity_size = (unsigned char) identity.size ();
 | 
			
		||||
        put_uint32 (buffer + 1, next_integral_routing_id++);
 | 
			
		||||
        routing_id = blob_t (buffer, sizeof buffer);
 | 
			
		||||
        memcpy (options.routing_id, routing_id.data (), routing_id.size ());
 | 
			
		||||
        options.routing_id_size = (unsigned char) routing_id.size ();
 | 
			
		||||
    }
 | 
			
		||||
    pipe_->set_identity (identity);
 | 
			
		||||
    pipe_->set_routing_id (routing_id);
 | 
			
		||||
    //  Add the record into output pipes lookup table
 | 
			
		||||
    outpipe_t outpipe = {pipe_, true};
 | 
			
		||||
    const bool ok = outpipes.insert (
 | 
			
		||||
        outpipes_t::value_type (identity, outpipe)).second;
 | 
			
		||||
        outpipes_t::value_type (routing_id, outpipe)).second;
 | 
			
		||||
    zmq_assert (ok);
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -70,10 +70,10 @@ namespace zmq
 | 
			
		||||
 | 
			
		||||
        //  If true, the receiver got the message part with
 | 
			
		||||
        //  the peer's identity.
 | 
			
		||||
        bool identity_sent;
 | 
			
		||||
        bool routing_id_sent;
 | 
			
		||||
 | 
			
		||||
        //  Holds the prefetched identity.
 | 
			
		||||
        msg_t prefetched_id;
 | 
			
		||||
        msg_t prefetched_routing_id;
 | 
			
		||||
 | 
			
		||||
        //  Holds the prefetched message.
 | 
			
		||||
        msg_t prefetched_msg;
 | 
			
		||||
@@ -96,7 +96,7 @@ namespace zmq
 | 
			
		||||
 | 
			
		||||
        //  Routing IDs are generated. It's a simple increment and wrap-over
 | 
			
		||||
        //  algorithm. This value is the next ID to use (if not used already).
 | 
			
		||||
        uint32_t next_rid;
 | 
			
		||||
        uint32_t next_integral_routing_id;
 | 
			
		||||
 | 
			
		||||
        stream_t (const stream_t&);
 | 
			
		||||
        const stream_t &operator = (const stream_t&);
 | 
			
		||||
 
 | 
			
		||||
@@ -81,8 +81,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
 | 
			
		||||
    options (options_),
 | 
			
		||||
    endpoint (endpoint_),
 | 
			
		||||
    plugged (false),
 | 
			
		||||
    next_msg (&stream_engine_t::identity_msg),
 | 
			
		||||
    process_msg (&stream_engine_t::process_identity_msg),
 | 
			
		||||
    next_msg (&stream_engine_t::routing_id_msg),
 | 
			
		||||
    process_msg (&stream_engine_t::process_routing_id_msg),
 | 
			
		||||
    io_error (false),
 | 
			
		||||
    subscription_required (false),
 | 
			
		||||
    mechanism (NULL),
 | 
			
		||||
@@ -229,11 +229,11 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
 | 
			
		||||
        // start optional timer, to prevent handshake hanging on no input
 | 
			
		||||
        set_handshake_timer ();
 | 
			
		||||
 | 
			
		||||
        //  Send the 'length' and 'flags' fields of the identity message.
 | 
			
		||||
        //  Send the 'length' and 'flags' fields of the routing id message.
 | 
			
		||||
        //  The 'length' field is encoded in the long format.
 | 
			
		||||
        outpos = greeting_send;
 | 
			
		||||
        outpos [outsize++] = 0xff;
 | 
			
		||||
        put_uint64 (&outpos [outsize], options.identity_size + 1);
 | 
			
		||||
        put_uint64 (&outpos [outsize], options.routing_id_size + 1);
 | 
			
		||||
        outsize += 8;
 | 
			
		||||
        outpos [outsize++] = 0x7f;
 | 
			
		||||
    }
 | 
			
		||||
@@ -520,7 +520,7 @@ bool zmq::stream_engine_t::handshake ()
 | 
			
		||||
 | 
			
		||||
        //  Inspect the right-most bit of the 10th byte (which coincides
 | 
			
		||||
        //  with the 'flags' field if a regular message was sent).
 | 
			
		||||
        //  Zero indicates this is a header of identity message
 | 
			
		||||
        //  Zero indicates this is a header of routing id message
 | 
			
		||||
        //  (i.e. the peer is using the unversioned protocol).
 | 
			
		||||
        if (!(greeting_recv [9] & 0x01))
 | 
			
		||||
            break;
 | 
			
		||||
@@ -575,7 +575,7 @@ bool zmq::stream_engine_t::handshake ()
 | 
			
		||||
    const size_t revision_pos = 10;
 | 
			
		||||
 | 
			
		||||
    //  Is the peer using ZMTP/1.0 with no revision number?
 | 
			
		||||
    //  If so, we send and receive rest of identity message
 | 
			
		||||
    //  If so, we send and receive rest of routing id message
 | 
			
		||||
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
 | 
			
		||||
        if (session->zap_enabled ()) {
 | 
			
		||||
           // reject ZMTP 1.0 connections if ZAP is enabled
 | 
			
		||||
@@ -593,14 +593,14 @@ bool zmq::stream_engine_t::handshake ()
 | 
			
		||||
        //  Since there is no way to tell the encoder to
 | 
			
		||||
        //  skip the message header, we simply throw that
 | 
			
		||||
        //  header data away.
 | 
			
		||||
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
 | 
			
		||||
        const size_t header_size = options.routing_id_size + 1 >= 255 ? 10 : 2;
 | 
			
		||||
        unsigned char tmp [10], *bufferp = tmp;
 | 
			
		||||
 | 
			
		||||
        //  Prepare the identity message and load it into encoder.
 | 
			
		||||
        //  Prepare the routing id message and load it into encoder.
 | 
			
		||||
        //  Then consume bytes we have already sent to the peer.
 | 
			
		||||
        const int rc = tx_msg.init_size (options.identity_size);
 | 
			
		||||
        const int rc = tx_msg.init_size (options.routing_id_size);
 | 
			
		||||
        zmq_assert (rc == 0);
 | 
			
		||||
        memcpy (tx_msg.data (), options.identity, options.identity_size);
 | 
			
		||||
        memcpy (tx_msg.data (), options.routing_id, options.routing_id_size);
 | 
			
		||||
        encoder->load_msg (&tx_msg);
 | 
			
		||||
        size_t buffer_size = encoder->encode (&bufferp, header_size);
 | 
			
		||||
        zmq_assert (buffer_size == header_size);
 | 
			
		||||
@@ -615,12 +615,12 @@ bool zmq::stream_engine_t::handshake ()
 | 
			
		||||
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
 | 
			
		||||
            subscription_required = true;
 | 
			
		||||
 | 
			
		||||
        //  We are sending our identity now and the next message
 | 
			
		||||
        //  We are sending our routing id now and the next message
 | 
			
		||||
        //  will come from the socket.
 | 
			
		||||
        next_msg = &stream_engine_t::pull_msg_from_session;
 | 
			
		||||
 | 
			
		||||
        //  We are expecting identity message.
 | 
			
		||||
        process_msg = &stream_engine_t::process_identity_msg;
 | 
			
		||||
        //  We are expecting routing id message.
 | 
			
		||||
        process_msg = &stream_engine_t::process_routing_id_msg;
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
 | 
			
		||||
@@ -729,20 +729,20 @@ bool zmq::stream_engine_t::handshake ()
 | 
			
		||||
    return true;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
 | 
			
		||||
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
 | 
			
		||||
{
 | 
			
		||||
    int rc = msg_->init_size (options.identity_size);
 | 
			
		||||
    int rc = msg_->init_size (options.routing_id_size);
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
    if (options.identity_size > 0)
 | 
			
		||||
        memcpy (msg_->data (), options.identity, options.identity_size);
 | 
			
		||||
    if (options.routing_id_size > 0)
 | 
			
		||||
        memcpy (msg_->data (), options.routing_id, options.routing_id_size);
 | 
			
		||||
    next_msg = &stream_engine_t::pull_msg_from_session;
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
 | 
			
		||||
int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_)
 | 
			
		||||
{
 | 
			
		||||
    if (options.recv_identity) {
 | 
			
		||||
        msg_->set_flags (msg_t::identity);
 | 
			
		||||
    if (options.recv_routing_id) {
 | 
			
		||||
        msg_->set_flags (msg_t::routing_id);
 | 
			
		||||
        int rc = session->push_msg (msg_);
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
    }
 | 
			
		||||
@@ -839,14 +839,14 @@ void zmq::stream_engine_t::mechanism_ready ()
 | 
			
		||||
        has_heartbeat_timer = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (options.recv_identity) {
 | 
			
		||||
        msg_t identity;
 | 
			
		||||
        mechanism->peer_identity (&identity);
 | 
			
		||||
        const int rc = session->push_msg (&identity);
 | 
			
		||||
    if (options.recv_routing_id) {
 | 
			
		||||
        msg_t routing_id;
 | 
			
		||||
        mechanism->peer_routing_id (&routing_id);
 | 
			
		||||
        const int rc = session->push_msg (&routing_id);
 | 
			
		||||
        if (rc == -1 && errno == EAGAIN) {
 | 
			
		||||
            // If the write is failing at this stage with
 | 
			
		||||
            // an EAGAIN the pipe must be being shut down,
 | 
			
		||||
            // so we can just bail out of the identity set.
 | 
			
		||||
            // so we can just bail out of the routing id set.
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
 
 | 
			
		||||
@@ -99,8 +99,8 @@ namespace zmq
 | 
			
		||||
        //  Detects the protocol used by the peer.
 | 
			
		||||
        bool handshake ();
 | 
			
		||||
 | 
			
		||||
        int identity_msg (msg_t *msg_);
 | 
			
		||||
        int process_identity_msg (msg_t *msg_);
 | 
			
		||||
        int routing_id_msg (msg_t *msg_);
 | 
			
		||||
        int process_routing_id_msg (msg_t *msg_);
 | 
			
		||||
 | 
			
		||||
        int next_handshake_command (msg_t *msg);
 | 
			
		||||
        int process_handshake_command (msg_t *msg);
 | 
			
		||||
 
 | 
			
		||||
@@ -104,10 +104,10 @@ void zap_client_t::send_zap_request (const char *mechanism,
 | 
			
		||||
    rc = session->write_zap_msg (&msg);
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    //  Identity frame
 | 
			
		||||
    rc = msg.init_size (options.identity_size);
 | 
			
		||||
    //  Routing id frame
 | 
			
		||||
    rc = msg.init_size (options.routing_id_size);
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
    memcpy (msg.data (), options.identity, options.identity_size);
 | 
			
		||||
    memcpy (msg.data (), options.routing_id, options.routing_id_size);
 | 
			
		||||
    msg.set_flags (msg_t::more);
 | 
			
		||||
    rc = session->write_zap_msg (&msg);
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
 
 | 
			
		||||
@@ -1359,14 +1359,14 @@ int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_event
 | 
			
		||||
//  Peer-specific state
 | 
			
		||||
 | 
			
		||||
int zmq_socket_get_peer_state (void *s_,
 | 
			
		||||
                               const void *identity,
 | 
			
		||||
                               size_t identity_size)
 | 
			
		||||
                               const void *routing_id_,
 | 
			
		||||
                               size_t routing_id_size_)
 | 
			
		||||
{
 | 
			
		||||
    zmq::socket_base_t *s = as_socket_base_t (s_);
 | 
			
		||||
    if (!s)
 | 
			
		||||
        return -1;
 | 
			
		||||
 | 
			
		||||
    return s->get_peer_state (identity, identity_size);
 | 
			
		||||
    return s->get_peer_state (routing_id_, routing_id_size_);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//  Timers
 | 
			
		||||
 
 | 
			
		||||
@@ -142,8 +142,8 @@ int zmq_poller_remove_fd (void *poller, int fd);
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
int zmq_socket_get_peer_state (void *socket,
 | 
			
		||||
                               const void *identity,
 | 
			
		||||
                               size_t identity_size);
 | 
			
		||||
                               const void *routing_id,
 | 
			
		||||
                               size_t routing_id_size);
 | 
			
		||||
 | 
			
		||||
/******************************************************************************/
 | 
			
		||||
/*  Scheduling timers                                                         */
 | 
			
		||||
 
 | 
			
		||||
@@ -126,8 +126,8 @@ void test_router_2_router(bool named){
 | 
			
		||||
 | 
			
		||||
    //  If we're in named mode, set some identities.
 | 
			
		||||
    if (named) {
 | 
			
		||||
        ret = zmq_setsockopt (rbind, ZMQ_IDENTITY, "X", 1);
 | 
			
		||||
        ret = zmq_setsockopt (rconn1, ZMQ_IDENTITY, "Y", 1);
 | 
			
		||||
        ret = zmq_setsockopt (rbind, ZMQ_ROUTING_ID, "X", 1);
 | 
			
		||||
        ret = zmq_setsockopt (rconn1, ZMQ_ROUTING_ID, "Y", 1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    //  Make call to connect using a connect_rid.
 | 
			
		||||
 
 | 
			
		||||
@@ -62,7 +62,7 @@ int main (void)
 | 
			
		||||
        void *dealer = zmq_socket (ctx2, ZMQ_DEALER);
 | 
			
		||||
        char identity [10];
 | 
			
		||||
        sprintf (identity, "%09d", cycle);
 | 
			
		||||
        rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, identity, 10);
 | 
			
		||||
        rc = zmq_setsockopt (dealer, ZMQ_ROUTING_ID, identity, 10);
 | 
			
		||||
        assert (rc == 0);
 | 
			
		||||
        int rcvtimeo = 1000;
 | 
			
		||||
        rc = zmq_setsockopt (dealer, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
 | 
			
		||||
 
 | 
			
		||||
@@ -48,7 +48,7 @@ int main (void)
 | 
			
		||||
    //  Create client and connect to server, doing a probe
 | 
			
		||||
    void *client = zmq_socket (ctx, ZMQ_ROUTER);
 | 
			
		||||
    assert (client);
 | 
			
		||||
    rc = zmq_setsockopt (client, ZMQ_IDENTITY, "X", 1);
 | 
			
		||||
    rc = zmq_setsockopt (client, ZMQ_ROUTING_ID, "X", 1);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
    int probe = 1;
 | 
			
		||||
    rc = zmq_setsockopt (client, ZMQ_PROBE_ROUTER, &probe, sizeof (probe));
 | 
			
		||||
 
 | 
			
		||||
@@ -53,7 +53,7 @@ int main (void)
 | 
			
		||||
    //  Create dealer called "X" and connect it to our router
 | 
			
		||||
    void *dealer_one = zmq_socket (ctx, ZMQ_DEALER);
 | 
			
		||||
    assert (dealer_one);
 | 
			
		||||
    rc = zmq_setsockopt (dealer_one, ZMQ_IDENTITY, "X", 1);
 | 
			
		||||
    rc = zmq_setsockopt (dealer_one, ZMQ_ROUTING_ID, "X", 1);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
    rc = zmq_connect (dealer_one, my_endpoint);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
@@ -71,7 +71,7 @@ int main (void)
 | 
			
		||||
    // Now create a second dealer that uses the same identity
 | 
			
		||||
    void *dealer_two = zmq_socket (ctx, ZMQ_DEALER);
 | 
			
		||||
    assert (dealer_two);
 | 
			
		||||
    rc = zmq_setsockopt (dealer_two, ZMQ_IDENTITY, "X", 1);
 | 
			
		||||
    rc = zmq_setsockopt (dealer_two, ZMQ_ROUTING_ID, "X", 1);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
    rc = zmq_connect (dealer_two, my_endpoint);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
 
 | 
			
		||||
@@ -85,13 +85,13 @@ void test_get_peer_state ()
 | 
			
		||||
    const char *dealer2_identity = "Y";
 | 
			
		||||
 | 
			
		||||
    //  Name dealer1 "X" and connect it to our router
 | 
			
		||||
    rc = zmq_setsockopt (dealer1, ZMQ_IDENTITY, dealer1_identity, 1);
 | 
			
		||||
    rc = zmq_setsockopt (dealer1, ZMQ_ROUTING_ID, dealer1_identity, 1);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
    rc = zmq_connect (dealer1, my_endpoint);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    //  Name dealer2 "Y" and connect it to our router
 | 
			
		||||
    rc = zmq_setsockopt (dealer2, ZMQ_IDENTITY, dealer2_identity, 1);
 | 
			
		||||
    rc = zmq_setsockopt (dealer2, ZMQ_ROUTING_ID, dealer2_identity, 1);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
    rc = zmq_connect (dealer2, my_endpoint);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
@@ -250,7 +250,7 @@ void test_basic ()
 | 
			
		||||
    //  Create dealer called "X" and connect it to our router
 | 
			
		||||
    void *dealer = zmq_socket (ctx, ZMQ_DEALER);
 | 
			
		||||
    assert (dealer);
 | 
			
		||||
    rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, "X", 1);
 | 
			
		||||
    rc = zmq_setsockopt (dealer, ZMQ_ROUTING_ID, "X", 1);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
    rc = zmq_connect (dealer, my_endpoint);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
 
 | 
			
		||||
@@ -108,7 +108,7 @@ int main (void)
 | 
			
		||||
    //  Server socket will accept connections
 | 
			
		||||
    void *server = zmq_socket (ctx, ZMQ_DEALER);
 | 
			
		||||
    assert (server);
 | 
			
		||||
    int rc = zmq_setsockopt (server, ZMQ_IDENTITY, "IDENT", 6);
 | 
			
		||||
    int rc = zmq_setsockopt (server, ZMQ_ROUTING_ID, "IDENT", 6);
 | 
			
		||||
    const char domain[] = "test";
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
    rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, domain, strlen (domain));
 | 
			
		||||
 
 | 
			
		||||
@@ -82,7 +82,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
 | 
			
		||||
    void *req = zmq_socket (ctx, ZMQ_REQ);
 | 
			
		||||
    assert (req);
 | 
			
		||||
 | 
			
		||||
    int rc = zmq_setsockopt(req, ZMQ_IDENTITY, "A", 2);
 | 
			
		||||
    int rc = zmq_setsockopt(req, ZMQ_ROUTING_ID, "A", 2);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    rc = zmq_bind (req, bind_address);
 | 
			
		||||
 
 | 
			
		||||
@@ -58,7 +58,7 @@ void test_fair_queue_in (void *ctx)
 | 
			
		||||
 | 
			
		||||
        char *str = strdup("A");
 | 
			
		||||
        str [0] += peer;
 | 
			
		||||
        rc = zmq_setsockopt (senders [peer], ZMQ_IDENTITY, str, 2);
 | 
			
		||||
        rc = zmq_setsockopt (senders [peer], ZMQ_ROUTING_ID, str, 2);
 | 
			
		||||
        assert (rc == 0);
 | 
			
		||||
        free (str);
 | 
			
		||||
 | 
			
		||||
@@ -130,7 +130,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
 | 
			
		||||
    void *B = zmq_socket (ctx, ZMQ_DEALER);
 | 
			
		||||
    assert (B);
 | 
			
		||||
 | 
			
		||||
    rc = zmq_setsockopt (B, ZMQ_IDENTITY, "B", 2);
 | 
			
		||||
    rc = zmq_setsockopt (B, ZMQ_ROUTING_ID, "B", 2);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    rc = zmq_connect (B, connect_address);
 | 
			
		||||
 
 | 
			
		||||
@@ -277,7 +277,7 @@ test_stream_to_stream (void)
 | 
			
		||||
 | 
			
		||||
    //  Sent HTTP request on client socket
 | 
			
		||||
    //  Get server identity
 | 
			
		||||
    rc = zmq_getsockopt (client, ZMQ_IDENTITY, id, &id_size);
 | 
			
		||||
    rc = zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
    //  First frame is server identity
 | 
			
		||||
    rc = zmq_send (client, id, id_size, ZMQ_SNDMORE);
 | 
			
		||||
 
 | 
			
		||||
@@ -57,7 +57,7 @@ bool has_more (void* socket)
 | 
			
		||||
 | 
			
		||||
bool get_identity (void* socket, char* data, size_t* size)
 | 
			
		||||
{
 | 
			
		||||
    int rc = zmq_getsockopt (socket, ZMQ_IDENTITY, data, size);
 | 
			
		||||
    int rc = zmq_getsockopt (socket, ZMQ_ROUTING_ID, data, size);
 | 
			
		||||
    return rc == 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -140,7 +140,7 @@ int main(int, char**)
 | 
			
		||||
    // Send initial message.
 | 
			
		||||
    char blob_data [256];
 | 
			
		||||
    size_t blob_size = sizeof(blob_data);
 | 
			
		||||
    rc = zmq_getsockopt (sockets [CLIENT], ZMQ_IDENTITY, blob_data, &blob_size);
 | 
			
		||||
    rc = zmq_getsockopt (sockets [CLIENT], ZMQ_ROUTING_ID, blob_data, &blob_size);
 | 
			
		||||
    assert (rc != -1);
 | 
			
		||||
    assert(blob_size > 0);
 | 
			
		||||
    zmq_msg_t msg;
 | 
			
		||||
 
 | 
			
		||||
@@ -81,7 +81,7 @@ void socket_config_plain_server (void *server, void *server_secret)
 | 
			
		||||
 | 
			
		||||
    rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, test_zap_domain,
 | 
			
		||||
                         strlen (test_zap_domain));
 | 
			
		||||
    assert(rc == 0);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//  CURVE specific functions
 | 
			
		||||
@@ -113,7 +113,7 @@ void socket_config_curve_server (void *server, void *server_secret)
 | 
			
		||||
 | 
			
		||||
    rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, test_zap_domain,
 | 
			
		||||
                         strlen (test_zap_domain));
 | 
			
		||||
    assert(rc == 0);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct curve_client_data_t
 | 
			
		||||
@@ -202,8 +202,7 @@ void zap_handler_generic (void *ctx,
 | 
			
		||||
        char *version = s_recv (handler);
 | 
			
		||||
        if (!version)
 | 
			
		||||
            break; //  Terminating - peer's socket closed
 | 
			
		||||
        if (zap_protocol == zap_disconnect)
 | 
			
		||||
        {
 | 
			
		||||
        if (zap_protocol == zap_disconnect) {
 | 
			
		||||
            free (version);
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
@@ -225,15 +224,15 @@ void zap_handler_generic (void *ctx,
 | 
			
		||||
            authentication_succeeded =
 | 
			
		||||
              streq (client_key_text, valid_client_public);
 | 
			
		||||
        } else if (streq (mechanism, "PLAIN")) {
 | 
			
		||||
            char client_username [32];
 | 
			
		||||
            char client_username[32];
 | 
			
		||||
            int size = zmq_recv (handler, client_username, 32, 0);
 | 
			
		||||
            assert (size > 0);
 | 
			
		||||
            client_username [size] = 0;
 | 
			
		||||
            client_username[size] = 0;
 | 
			
		||||
 | 
			
		||||
            char client_password [32];
 | 
			
		||||
            char client_password[32];
 | 
			
		||||
            size = zmq_recv (handler, client_password, 32, 0);
 | 
			
		||||
            assert (size > 0);
 | 
			
		||||
            client_password [size] = 0;
 | 
			
		||||
            client_password[size] = 0;
 | 
			
		||||
 | 
			
		||||
            authentication_succeeded =
 | 
			
		||||
              streq (test_plain_username, client_username)
 | 
			
		||||
@@ -283,7 +282,7 @@ void zap_handler_generic (void *ctx,
 | 
			
		||||
            s_sendmore (handler, "Invalid client public key");
 | 
			
		||||
            s_sendmore (handler, "");
 | 
			
		||||
            if (zap_protocol != zap_do_not_send)
 | 
			
		||||
                s_send(handler, "");
 | 
			
		||||
                s_send (handler, "");
 | 
			
		||||
        }
 | 
			
		||||
        free (version);
 | 
			
		||||
        free (sequence);
 | 
			
		||||
@@ -298,10 +297,9 @@ void zap_handler_generic (void *ctx,
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
    close_zero_linger (handler);
 | 
			
		||||
 | 
			
		||||
    if (zap_protocol != zap_disconnect)
 | 
			
		||||
    {
 | 
			
		||||
        rc = s_send(control, "STOPPED");
 | 
			
		||||
        assert(rc == 7);
 | 
			
		||||
    if (zap_protocol != zap_disconnect) {
 | 
			
		||||
        rc = s_send (control, "STOPPED");
 | 
			
		||||
        assert (rc == 7);
 | 
			
		||||
    }
 | 
			
		||||
    close_zero_linger (control);
 | 
			
		||||
}
 | 
			
		||||
@@ -560,7 +558,7 @@ void setup_context_and_server_side (
 | 
			
		||||
 | 
			
		||||
    socket_config_ (*server, socket_config_data_);
 | 
			
		||||
 | 
			
		||||
    rc = zmq_setsockopt (*server, ZMQ_IDENTITY, identity, strlen (identity));
 | 
			
		||||
    rc = zmq_setsockopt (*server, ZMQ_ROUTING_ID, identity, strlen (identity));
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    rc = zmq_bind (*server, "tcp://127.0.0.1:*");
 | 
			
		||||
@@ -570,7 +568,7 @@ void setup_context_and_server_side (
 | 
			
		||||
    rc = zmq_getsockopt (*server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
 | 
			
		||||
    assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    const char server_monitor_endpoint [] = "inproc://monitor-server";
 | 
			
		||||
    const char server_monitor_endpoint[] = "inproc://monitor-server";
 | 
			
		||||
    setup_handshake_socket_monitor (*ctx, *server, server_mon,
 | 
			
		||||
                                    server_monitor_endpoint);
 | 
			
		||||
}
 | 
			
		||||
@@ -592,7 +590,7 @@ void shutdown_context_and_server_side (void *ctx,
 | 
			
		||||
        rc = zmq_unbind (zap_control, "inproc://handler-control");
 | 
			
		||||
        assert (rc == 0);
 | 
			
		||||
    }
 | 
			
		||||
    close_zero_linger(zap_control);
 | 
			
		||||
    close_zero_linger (zap_control);
 | 
			
		||||
 | 
			
		||||
#ifdef ZMQ_BUILD_DRAFT_API
 | 
			
		||||
    close_zero_linger (server_mon);
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user