mirror of
https://github.com/zeromq/libzmq.git
synced 2025-07-01 00:13:31 +02:00
commit
210fcbbbeb
@ -108,10 +108,10 @@ void zmq::dealer_t::xwrite_activated (pipe_t *pipe_)
|
|||||||
lb.activated (pipe_);
|
lb.activated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::dealer_t::xterminated (pipe_t *pipe_)
|
void zmq::dealer_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
fq.terminated (pipe_);
|
fq.pipe_terminated (pipe_);
|
||||||
lb.terminated (pipe_);
|
lb.pipe_terminated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::dealer_session_t::dealer_session_t (io_thread_t *io_thread_, bool connect_,
|
zmq::dealer_session_t::dealer_session_t (io_thread_t *io_thread_, bool connect_,
|
||||||
|
@ -53,7 +53,7 @@ namespace zmq
|
|||||||
bool xhas_out ();
|
bool xhas_out ();
|
||||||
void xread_activated (zmq::pipe_t *pipe_);
|
void xread_activated (zmq::pipe_t *pipe_);
|
||||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||||
void xterminated (zmq::pipe_t *pipe_);
|
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
@ -74,7 +74,7 @@ void zmq::dist_t::unmatch ()
|
|||||||
matching = 0;
|
matching = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::dist_t::terminated (pipe_t *pipe_)
|
void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Remove the pipe from the list; adjust number of matching, active and/or
|
// Remove the pipe from the list; adjust number of matching, active and/or
|
||||||
// eligible pipes accordingly.
|
// eligible pipes accordingly.
|
||||||
|
@ -54,7 +54,7 @@ namespace zmq
|
|||||||
void unmatch ();
|
void unmatch ();
|
||||||
|
|
||||||
// Removes the pipe from the distributor object.
|
// Removes the pipe from the distributor object.
|
||||||
void terminated (zmq::pipe_t *pipe_);
|
void pipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
// Send the message to the matching outbound pipes.
|
// Send the message to the matching outbound pipes.
|
||||||
int send_to_matching (zmq::msg_t *msg_);
|
int send_to_matching (zmq::msg_t *msg_);
|
||||||
|
@ -41,7 +41,7 @@ void zmq::fq_t::attach (pipe_t *pipe_)
|
|||||||
active++;
|
active++;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::fq_t::terminated (pipe_t *pipe_)
|
void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
const pipes_t::size_type index = pipes.index (pipe_);
|
const pipes_t::size_type index = pipes.index (pipe_);
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ namespace zmq
|
|||||||
|
|
||||||
void attach (pipe_t *pipe_);
|
void attach (pipe_t *pipe_);
|
||||||
void activated (pipe_t *pipe_);
|
void activated (pipe_t *pipe_);
|
||||||
void terminated (pipe_t *pipe_);
|
void pipe_terminated (pipe_t *pipe_);
|
||||||
|
|
||||||
int recv (msg_t *msg_);
|
int recv (msg_t *msg_);
|
||||||
int recvpipe (msg_t *msg_, pipe_t **pipe_);
|
int recvpipe (msg_t *msg_, pipe_t **pipe_);
|
||||||
|
@ -46,6 +46,8 @@ namespace zmq
|
|||||||
// This method is called by the session to signalise that there
|
// This method is called by the session to signalise that there
|
||||||
// are messages to send available.
|
// are messages to send available.
|
||||||
virtual void activate_out () = 0;
|
virtual void activate_out () = 0;
|
||||||
|
|
||||||
|
virtual void zap_msg_available () = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,7 @@ void zmq::lb_t::attach (pipe_t *pipe_)
|
|||||||
activated (pipe_);
|
activated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::lb_t::terminated (pipe_t *pipe_)
|
void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
pipes_t::size_type index = pipes.index (pipe_);
|
pipes_t::size_type index = pipes.index (pipe_);
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ namespace zmq
|
|||||||
|
|
||||||
void attach (pipe_t *pipe_);
|
void attach (pipe_t *pipe_);
|
||||||
void activated (pipe_t *pipe_);
|
void activated (pipe_t *pipe_);
|
||||||
void terminated (pipe_t *pipe_);
|
void pipe_terminated (pipe_t *pipe_);
|
||||||
|
|
||||||
int send (msg_t *msg_);
|
int send (msg_t *msg_);
|
||||||
bool has_out ();
|
bool has_out ();
|
||||||
|
@ -46,6 +46,9 @@ namespace zmq
|
|||||||
// Process the handshake message received from the peer.
|
// Process the handshake message received from the peer.
|
||||||
virtual int process_handshake_message (msg_t *msg_) = 0;
|
virtual int process_handshake_message (msg_t *msg_) = 0;
|
||||||
|
|
||||||
|
// Notifies mechanism about availability of ZAP message.
|
||||||
|
virtual int zap_msg_available () { return 0; }
|
||||||
|
|
||||||
// True iff the handshake stage is complete?
|
// True iff the handshake stage is complete?
|
||||||
virtual bool is_handshake_complete () const = 0;
|
virtual bool is_handshake_complete () const = 0;
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
|||||||
pipe_->terminate (false);
|
pipe_->terminate (false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::pair_t::xterminated (pipe_t *pipe_)
|
void zmq::pair_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
if (pipe_ == pipe)
|
if (pipe_ == pipe)
|
||||||
pipe = NULL;
|
pipe = NULL;
|
||||||
|
@ -47,7 +47,7 @@ namespace zmq
|
|||||||
bool xhas_out ();
|
bool xhas_out ();
|
||||||
void xread_activated (zmq::pipe_t *pipe_);
|
void xread_activated (zmq::pipe_t *pipe_);
|
||||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||||
void xterminated (zmq::pipe_t *pipe_);
|
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
@ -59,6 +59,7 @@ namespace zmq
|
|||||||
void terminate ();
|
void terminate ();
|
||||||
void activate_in ();
|
void activate_in ();
|
||||||
void activate_out ();
|
void activate_out ();
|
||||||
|
void zap_msg_available () {}
|
||||||
|
|
||||||
// i_poll_events interface implementation.
|
// i_poll_events interface implementation.
|
||||||
void in_event ();
|
void in_event ();
|
||||||
|
@ -58,6 +58,7 @@ namespace zmq
|
|||||||
void terminate ();
|
void terminate ();
|
||||||
void activate_in ();
|
void activate_in ();
|
||||||
void activate_out ();
|
void activate_out ();
|
||||||
|
void zap_msg_available () {}
|
||||||
|
|
||||||
// i_poll_events interface implementation.
|
// i_poll_events interface implementation.
|
||||||
void in_event ();
|
void in_event ();
|
||||||
|
@ -285,7 +285,7 @@ void zmq::pipe_t::process_pipe_term_ack ()
|
|||||||
{
|
{
|
||||||
// Notify the user that all the references to the pipe should be dropped.
|
// Notify the user that all the references to the pipe should be dropped.
|
||||||
zmq_assert (sink);
|
zmq_assert (sink);
|
||||||
sink->terminated (this);
|
sink->pipe_terminated (this);
|
||||||
|
|
||||||
// In term_ack_sent and term_req_sent2 states there's nothing to do.
|
// In term_ack_sent and term_req_sent2 states there's nothing to do.
|
||||||
// Simply deallocate the pipe. In term_req_sent1 state we have to ack
|
// Simply deallocate the pipe. In term_req_sent1 state we have to ack
|
||||||
@ -340,7 +340,7 @@ void zmq::pipe_t::terminate (bool delay_)
|
|||||||
// There are still pending messages available, but the user calls
|
// There are still pending messages available, but the user calls
|
||||||
// 'terminate'. We can act as if all the pending messages were read.
|
// 'terminate'. We can act as if all the pending messages were read.
|
||||||
else
|
else
|
||||||
if (state == waiting_for_delimiter && delay == 0) {
|
if (state == waiting_for_delimiter && !delay) {
|
||||||
outpipe = NULL;
|
outpipe = NULL;
|
||||||
send_pipe_term_ack (peer);
|
send_pipe_term_ack (peer);
|
||||||
state = term_ack_sent;
|
state = term_ack_sent;
|
||||||
|
@ -50,7 +50,7 @@ namespace zmq
|
|||||||
virtual void read_activated (zmq::pipe_t *pipe_) = 0;
|
virtual void read_activated (zmq::pipe_t *pipe_) = 0;
|
||||||
virtual void write_activated (zmq::pipe_t *pipe_) = 0;
|
virtual void write_activated (zmq::pipe_t *pipe_) = 0;
|
||||||
virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
|
virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
|
||||||
virtual void terminated (zmq::pipe_t *pipe_) = 0;
|
virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Note that pipe can be stored in three different arrays.
|
// Note that pipe can be stored in three different arrays.
|
||||||
|
@ -26,12 +26,15 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
#include "msg.hpp"
|
#include "msg.hpp"
|
||||||
|
#include "session_base.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "plain_mechanism.hpp"
|
#include "plain_mechanism.hpp"
|
||||||
#include "wire.hpp"
|
#include "wire.hpp"
|
||||||
|
|
||||||
zmq::plain_mechanism_t::plain_mechanism_t (const options_t &options_) :
|
zmq::plain_mechanism_t::plain_mechanism_t (session_base_t *session_,
|
||||||
|
const options_t &options_) :
|
||||||
mechanism_t (options_),
|
mechanism_t (options_),
|
||||||
|
session (session_),
|
||||||
state (options.as_server? waiting_for_hello: sending_hello)
|
state (options.as_server? waiting_for_hello: sending_hello)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -79,8 +82,16 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_)
|
|||||||
switch (state) {
|
switch (state) {
|
||||||
case waiting_for_hello:
|
case waiting_for_hello:
|
||||||
rc = process_hello_command (msg_);
|
rc = process_hello_command (msg_);
|
||||||
|
if (rc == 0) {
|
||||||
|
rc = receive_and_process_zap_reply ();
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
state = sending_welcome;
|
state = sending_welcome;
|
||||||
|
else
|
||||||
|
if (errno == EAGAIN) {
|
||||||
|
rc = 0;
|
||||||
|
state = waiting_for_zap_reply;
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case waiting_for_welcome:
|
case waiting_for_welcome:
|
||||||
rc = process_welcome_command (msg_);
|
rc = process_welcome_command (msg_);
|
||||||
@ -107,15 +118,25 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_)
|
|||||||
rc = msg_->init ();
|
rc = msg_->init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
}
|
}
|
||||||
return 0;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool zmq::plain_mechanism_t::is_handshake_complete () const
|
bool zmq::plain_mechanism_t::is_handshake_complete () const
|
||||||
{
|
{
|
||||||
return state == ready;
|
return state == ready;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::plain_mechanism_t::zap_msg_available ()
|
||||||
|
{
|
||||||
|
if (state != waiting_for_zap_reply) {
|
||||||
|
errno = EFSM;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
const int rc = receive_and_process_zap_reply ();
|
||||||
|
if (rc == 0)
|
||||||
|
state = sending_welcome;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const
|
int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const
|
||||||
{
|
{
|
||||||
@ -163,7 +184,7 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_)
|
|||||||
errno = EPROTO;
|
errno = EPROTO;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
size_t username_length = static_cast <size_t> (*ptr++);
|
const size_t username_length = static_cast <size_t> (*ptr++);
|
||||||
bytes_left -= 1;
|
bytes_left -= 1;
|
||||||
|
|
||||||
if (bytes_left < username_length) {
|
if (bytes_left < username_length) {
|
||||||
@ -178,7 +199,7 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_)
|
|||||||
errno = EPROTO;
|
errno = EPROTO;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
size_t password_length = static_cast <size_t> (*ptr++);
|
const size_t password_length = static_cast <size_t> (*ptr++);
|
||||||
bytes_left -= 1;
|
bytes_left -= 1;
|
||||||
|
|
||||||
if (bytes_left < password_length) {
|
if (bytes_left < password_length) {
|
||||||
@ -194,8 +215,65 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Add user authentication
|
// Use ZAP protocol (RFC 27) to authenticate user.
|
||||||
// Note: maybe use RFC 27 (ZAP) for this
|
int rc = session->zap_connect ();
|
||||||
|
if (rc == -1) {
|
||||||
|
errno = EPROTO;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
msg_t msg;
|
||||||
|
|
||||||
|
// Address delimiter frame
|
||||||
|
rc = msg.init ();
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
msg.set_flags (msg_t::more);
|
||||||
|
rc = session->write_zap_msg (&msg);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// Version frame
|
||||||
|
rc = msg.init_size (3);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
memcpy (msg.data (), "1.0", 3);
|
||||||
|
msg.set_flags (msg_t::more);
|
||||||
|
rc = session->write_zap_msg (&msg);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// Sequence frame
|
||||||
|
rc = msg.init_size (1);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
memcpy (msg.data (), "1", 1);
|
||||||
|
msg.set_flags (msg_t::more);
|
||||||
|
rc = session->write_zap_msg (&msg);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// Domain frame
|
||||||
|
rc = msg.init ();
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
msg.set_flags (msg_t::more);
|
||||||
|
rc = session->write_zap_msg (&msg);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// Mechanism frame
|
||||||
|
rc = msg.init_size (5);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
memcpy (msg.data (), "PLAIN", 5);
|
||||||
|
msg.set_flags (msg_t::more);
|
||||||
|
rc = session->write_zap_msg (&msg);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// Credentials frame
|
||||||
|
rc = msg.init_size (1 + username_length + 1 + password_length);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
char *data_ptr = static_cast <char *> (msg.data ());
|
||||||
|
*data_ptr++ = static_cast <unsigned char> (username_length);
|
||||||
|
memcpy (data_ptr, username.c_str (), username_length);
|
||||||
|
data_ptr += username_length;
|
||||||
|
*data_ptr++ = static_cast <unsigned char> (password_length);
|
||||||
|
memcpy (data_ptr, password.c_str (), password_length);
|
||||||
|
rc = session->write_zap_msg (&msg);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -307,6 +385,65 @@ int zmq::plain_mechanism_t::process_ready_command (msg_t *msg_)
|
|||||||
return parse_property_list (ptr + 8, bytes_left - 8);
|
return parse_property_list (ptr + 8, bytes_left - 8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::plain_mechanism_t::receive_and_process_zap_reply ()
|
||||||
|
{
|
||||||
|
int rc = 0;
|
||||||
|
msg_t msg [6];
|
||||||
|
|
||||||
|
for (int i = 0; i < 6; i++) {
|
||||||
|
rc = msg [i].init ();
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 6; i++) {
|
||||||
|
rc = session->read_zap_msg (&msg [i]);
|
||||||
|
if (rc == -1)
|
||||||
|
break;
|
||||||
|
if ((msg [i].flags () & msg_t::more) == (i < 5? 0: msg_t::more)) {
|
||||||
|
errno = EPROTO;
|
||||||
|
rc = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rc != 0)
|
||||||
|
goto error;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
// Address delimiter frame
|
||||||
|
if (msg [0].size () > 0) {
|
||||||
|
errno = EPROTO;
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Version frame
|
||||||
|
if (msg [1].size () != 3 || memcmp (msg [1].data (), "1.0", 3)) {
|
||||||
|
errno = EPROTO;
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sequence number frame
|
||||||
|
if (msg [2].size () != 1 || memcmp (msg [2].data (), "1", 1)) {
|
||||||
|
errno = EPROTO;
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status code frame
|
||||||
|
if (msg [3].size () != 3 || memcmp (msg [3].data (), "200", 3)) {
|
||||||
|
errno = EACCES;
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
|
error:
|
||||||
|
for (int i = 0; i < 6; i++) {
|
||||||
|
const int rc2 = msg [i].close ();
|
||||||
|
errno_assert (rc2 == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
int zmq::plain_mechanism_t::parse_property_list (const unsigned char *ptr,
|
int zmq::plain_mechanism_t::parse_property_list (const unsigned char *ptr,
|
||||||
size_t bytes_left)
|
size_t bytes_left)
|
||||||
{
|
{
|
||||||
|
@ -27,17 +27,20 @@ namespace zmq
|
|||||||
{
|
{
|
||||||
|
|
||||||
class msg_t;
|
class msg_t;
|
||||||
|
class session_base_t;
|
||||||
|
|
||||||
class plain_mechanism_t : public mechanism_t
|
class plain_mechanism_t : public mechanism_t
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
plain_mechanism_t (const options_t &options_);
|
plain_mechanism_t (session_base_t *session_,
|
||||||
|
const options_t &options_);
|
||||||
virtual ~plain_mechanism_t ();
|
virtual ~plain_mechanism_t ();
|
||||||
|
|
||||||
// mechanism implementation
|
// mechanism implementation
|
||||||
virtual int next_handshake_message (msg_t *msg_);
|
virtual int next_handshake_message (msg_t *msg_);
|
||||||
virtual int process_handshake_message (msg_t *msg_);
|
virtual int process_handshake_message (msg_t *msg_);
|
||||||
|
virtual int zap_msg_available ();
|
||||||
virtual bool is_handshake_complete () const;
|
virtual bool is_handshake_complete () const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -51,9 +54,11 @@ namespace zmq
|
|||||||
waiting_for_initiate,
|
waiting_for_initiate,
|
||||||
sending_ready,
|
sending_ready,
|
||||||
waiting_for_ready,
|
waiting_for_ready,
|
||||||
|
waiting_for_zap_reply,
|
||||||
ready
|
ready
|
||||||
};
|
};
|
||||||
|
|
||||||
|
session_base_t * const session;
|
||||||
state_t state;
|
state_t state;
|
||||||
|
|
||||||
int hello_command (msg_t *msg_) const;
|
int hello_command (msg_t *msg_) const;
|
||||||
@ -66,6 +71,8 @@ namespace zmq
|
|||||||
int process_ready_command (msg_t *msg_);
|
int process_ready_command (msg_t *msg_);
|
||||||
int process_initiate_command (msg_t *msg_);
|
int process_initiate_command (msg_t *msg_);
|
||||||
|
|
||||||
|
int receive_and_process_zap_reply ();
|
||||||
|
|
||||||
int parse_property_list (const unsigned char *ptr, size_t length);
|
int parse_property_list (const unsigned char *ptr, size_t length);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -46,9 +46,9 @@ void zmq::pull_t::xread_activated (pipe_t *pipe_)
|
|||||||
fq.activated (pipe_);
|
fq.activated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::pull_t::xterminated (pipe_t *pipe_)
|
void zmq::pull_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
fq.terminated (pipe_);
|
fq.pipe_terminated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::pull_t::xrecv (msg_t *msg_)
|
int zmq::pull_t::xrecv (msg_t *msg_)
|
||||||
|
@ -47,7 +47,7 @@ namespace zmq
|
|||||||
int xrecv (zmq::msg_t *msg_);
|
int xrecv (zmq::msg_t *msg_);
|
||||||
bool xhas_in ();
|
bool xhas_in ();
|
||||||
void xread_activated (zmq::pipe_t *pipe_);
|
void xread_activated (zmq::pipe_t *pipe_);
|
||||||
void xterminated (zmq::pipe_t *pipe_);
|
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
@ -46,9 +46,9 @@ void zmq::push_t::xwrite_activated (pipe_t *pipe_)
|
|||||||
lb.activated (pipe_);
|
lb.activated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::push_t::xterminated (pipe_t *pipe_)
|
void zmq::push_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
lb.terminated (pipe_);
|
lb.pipe_terminated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::push_t::xsend (msg_t *msg_)
|
int zmq::push_t::xsend (msg_t *msg_)
|
||||||
|
@ -47,7 +47,7 @@ namespace zmq
|
|||||||
int xsend (zmq::msg_t *msg_);
|
int xsend (zmq::msg_t *msg_);
|
||||||
bool xhas_out ();
|
bool xhas_out ();
|
||||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||||
void xterminated (zmq::pipe_t *pipe_);
|
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void zmq::router_t::xterminated (pipe_t *pipe_)
|
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
|
std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
|
||||||
if (it != anonymous_pipes.end ())
|
if (it != anonymous_pipes.end ())
|
||||||
@ -115,7 +115,7 @@ void zmq::router_t::xterminated (pipe_t *pipe_)
|
|||||||
outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
|
outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
|
||||||
zmq_assert (it != outpipes.end ());
|
zmq_assert (it != outpipes.end ());
|
||||||
outpipes.erase (it);
|
outpipes.erase (it);
|
||||||
fq.terminated (pipe_);
|
fq.pipe_terminated (pipe_);
|
||||||
if (pipe_ == current_out)
|
if (pipe_ == current_out)
|
||||||
current_out = NULL;
|
current_out = NULL;
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ namespace zmq
|
|||||||
bool xhas_out ();
|
bool xhas_out ();
|
||||||
void xread_activated (zmq::pipe_t *pipe_);
|
void xread_activated (zmq::pipe_t *pipe_);
|
||||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||||
void xterminated (zmq::pipe_t *pipe_);
|
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#include "pgm_receiver.hpp"
|
#include "pgm_receiver.hpp"
|
||||||
#include "address.hpp"
|
#include "address.hpp"
|
||||||
|
|
||||||
|
#include "ctx.hpp"
|
||||||
#include "req.hpp"
|
#include "req.hpp"
|
||||||
#include "dealer.hpp"
|
#include "dealer.hpp"
|
||||||
#include "rep.hpp"
|
#include "rep.hpp"
|
||||||
@ -105,6 +106,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
|
|||||||
io_object_t (io_thread_),
|
io_object_t (io_thread_),
|
||||||
connect (connect_),
|
connect (connect_),
|
||||||
pipe (NULL),
|
pipe (NULL),
|
||||||
|
zap_pipe (NULL),
|
||||||
incomplete_in (false),
|
incomplete_in (false),
|
||||||
pending (false),
|
pending (false),
|
||||||
engine (NULL),
|
engine (NULL),
|
||||||
@ -118,6 +120,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
|
|||||||
zmq::session_base_t::~session_base_t ()
|
zmq::session_base_t::~session_base_t ()
|
||||||
{
|
{
|
||||||
zmq_assert (!pipe);
|
zmq_assert (!pipe);
|
||||||
|
zmq_assert (!zap_pipe);
|
||||||
|
|
||||||
// If there's still a pending linger timer, remove it.
|
// If there's still a pending linger timer, remove it.
|
||||||
if (has_linger_timer) {
|
if (has_linger_timer) {
|
||||||
@ -165,6 +168,39 @@ int zmq::session_base_t::push_msg (msg_t *msg_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
|
||||||
|
{
|
||||||
|
if (zap_pipe == NULL) {
|
||||||
|
errno = ENOTCONN;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!zap_pipe->read (msg_)) {
|
||||||
|
errno = EAGAIN;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq::session_base_t::write_zap_msg (msg_t *msg_)
|
||||||
|
{
|
||||||
|
if (zap_pipe == NULL) {
|
||||||
|
errno = ENOTCONN;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
const bool ok = zap_pipe->write (msg_);
|
||||||
|
zmq_assert (ok);
|
||||||
|
|
||||||
|
if ((msg_->flags () & msg_t::more) == 0)
|
||||||
|
zap_pipe->flush ();
|
||||||
|
|
||||||
|
const int rc = msg_->init ();
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::session_base_t::reset ()
|
void zmq::session_base_t::reset ()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -197,14 +233,20 @@ void zmq::session_base_t::clean_pipes ()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::session_base_t::terminated (pipe_t *pipe_)
|
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Drop the reference to the deallocated pipe if required.
|
// Drop the reference to the deallocated pipe if required.
|
||||||
zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1);
|
zmq_assert (pipe_ == pipe
|
||||||
|
|| pipe_ == zap_pipe
|
||||||
|
|| terminating_pipes.count (pipe_) == 1);
|
||||||
|
|
||||||
if (pipe == pipe_)
|
if (pipe_ == pipe)
|
||||||
// If this is our current pipe, remove it
|
// If this is our current pipe, remove it
|
||||||
pipe = NULL;
|
pipe = NULL;
|
||||||
|
else
|
||||||
|
if (pipe_ == zap_pipe) {
|
||||||
|
zap_pipe = NULL;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
// Remove the pipe from the detached pipes set
|
// Remove the pipe from the detached pipes set
|
||||||
terminating_pipes.erase (pipe_);
|
terminating_pipes.erase (pipe_);
|
||||||
@ -220,22 +262,27 @@ void zmq::session_base_t::terminated (pipe_t *pipe_)
|
|||||||
// If we are waiting for pending messages to be sent, at this point
|
// If we are waiting for pending messages to be sent, at this point
|
||||||
// we are sure that there will be no more messages and we can proceed
|
// we are sure that there will be no more messages and we can proceed
|
||||||
// with termination safely.
|
// with termination safely.
|
||||||
if (pending && !pipe && terminating_pipes.empty ())
|
if (pending && !pipe && !zap_pipe && terminating_pipes.empty ())
|
||||||
proceed_with_term ();
|
proceed_with_term ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Skip activating if we're detaching this pipe
|
// Skip activating if we're detaching this pipe
|
||||||
if (pipe != pipe_) {
|
if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) {
|
||||||
zmq_assert (terminating_pipes.count (pipe_) == 1);
|
zmq_assert (terminating_pipes.count (pipe_) == 1);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (likely (engine != NULL))
|
if (unlikely (engine == NULL)) {
|
||||||
|
pipe->check_read ();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (likely (pipe_ == pipe))
|
||||||
engine->activate_out ();
|
engine->activate_out ();
|
||||||
else
|
else
|
||||||
pipe->check_read ();
|
engine->zap_msg_available ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::session_base_t::write_activated (pipe_t *pipe_)
|
void zmq::session_base_t::write_activated (pipe_t *pipe_)
|
||||||
@ -268,6 +315,50 @@ void zmq::session_base_t::process_plug ()
|
|||||||
start_connecting (false);
|
start_connecting (false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::session_base_t::zap_connect ()
|
||||||
|
{
|
||||||
|
zmq_assert (zap_pipe == NULL);
|
||||||
|
|
||||||
|
endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
|
||||||
|
if (peer.socket == NULL) {
|
||||||
|
errno = ECONNREFUSED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (peer.options.type != ZMQ_REP
|
||||||
|
&& peer.options.type != ZMQ_ROUTER) {
|
||||||
|
errno = ECONNREFUSED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a bi-directional pipe that will connect
|
||||||
|
// session with zap socket.
|
||||||
|
object_t *parents [2] = {this, peer.socket};
|
||||||
|
pipe_t *new_pipes [2] = {NULL, NULL};
|
||||||
|
int hwms [2] = {0, 0};
|
||||||
|
bool delays [2] = {false, false};
|
||||||
|
int rc = pipepair (parents, new_pipes, hwms, delays);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// Attach local end of the pipe to this socket object.
|
||||||
|
zap_pipe = new_pipes [0];
|
||||||
|
zap_pipe->set_event_sink (this);
|
||||||
|
|
||||||
|
send_bind (peer.socket, new_pipes [1], false);
|
||||||
|
|
||||||
|
// Send empty identity if required by the peer.
|
||||||
|
if (peer.options.recv_identity) {
|
||||||
|
msg_t id;
|
||||||
|
rc = id.init ();
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
id.set_flags (msg_t::identity);
|
||||||
|
bool ok = zap_pipe->write (&id);
|
||||||
|
zmq_assert (ok);
|
||||||
|
zap_pipe->flush ();
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::session_base_t::process_attach (i_engine *engine_)
|
void zmq::session_base_t::process_attach (i_engine *engine_)
|
||||||
{
|
{
|
||||||
zmq_assert (engine_ != NULL);
|
zmq_assert (engine_ != NULL);
|
||||||
@ -312,6 +403,9 @@ void zmq::session_base_t::detach ()
|
|||||||
// Just in case there's only a delimiter in the pipe.
|
// Just in case there's only a delimiter in the pipe.
|
||||||
if (pipe)
|
if (pipe)
|
||||||
pipe->check_read ();
|
pipe->check_read ();
|
||||||
|
|
||||||
|
if (zap_pipe)
|
||||||
|
zap_pipe->check_read ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::session_base_t::process_term (int linger_)
|
void zmq::session_base_t::process_term (int linger_)
|
||||||
@ -321,13 +415,14 @@ void zmq::session_base_t::process_term (int linger_)
|
|||||||
// If the termination of the pipe happens before the term command is
|
// If the termination of the pipe happens before the term command is
|
||||||
// delivered there's nothing much to do. We can proceed with the
|
// delivered there's nothing much to do. We can proceed with the
|
||||||
// stadard termination immediately.
|
// stadard termination immediately.
|
||||||
if (!pipe) {
|
if (!pipe && !zap_pipe) {
|
||||||
proceed_with_term ();
|
proceed_with_term ();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pending = true;
|
pending = true;
|
||||||
|
|
||||||
|
if (pipe != NULL) {
|
||||||
// If there's finite linger value, delay the termination.
|
// If there's finite linger value, delay the termination.
|
||||||
// If linger is infinite (negative) we don't even have to set
|
// If linger is infinite (negative) we don't even have to set
|
||||||
// the timer.
|
// the timer.
|
||||||
@ -345,11 +440,15 @@ void zmq::session_base_t::process_term (int linger_)
|
|||||||
// In case there's no engine and there's only delimiter in the
|
// In case there's no engine and there's only delimiter in the
|
||||||
// pipe it wouldn't be ever read. Thus we check for it explicitly.
|
// pipe it wouldn't be ever read. Thus we check for it explicitly.
|
||||||
pipe->check_read ();
|
pipe->check_read ();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (zap_pipe != NULL)
|
||||||
|
zap_pipe->terminate (false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::session_base_t::proceed_with_term ()
|
void zmq::session_base_t::proceed_with_term ()
|
||||||
{
|
{
|
||||||
// The pending phase have just ended.
|
// The pending phase has just ended.
|
||||||
pending = false;
|
pending = false;
|
||||||
|
|
||||||
// Continue with standard termination.
|
// Continue with standard termination.
|
||||||
|
@ -61,17 +61,29 @@ namespace zmq
|
|||||||
void read_activated (zmq::pipe_t *pipe_);
|
void read_activated (zmq::pipe_t *pipe_);
|
||||||
void write_activated (zmq::pipe_t *pipe_);
|
void write_activated (zmq::pipe_t *pipe_);
|
||||||
void hiccuped (zmq::pipe_t *pipe_);
|
void hiccuped (zmq::pipe_t *pipe_);
|
||||||
void terminated (zmq::pipe_t *pipe_);
|
void pipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
// Delivers a message. Returns 0 if successful; -1 otherwise.
|
// Delivers a message. Returns 0 if successful; -1 otherwise.
|
||||||
// The function takes ownership of the message.
|
// The function takes ownership of the message.
|
||||||
int push_msg (msg_t *msg_);
|
int push_msg (msg_t *msg_);
|
||||||
|
|
||||||
|
int zap_connect ();
|
||||||
|
|
||||||
// Fetches a message. Returns 0 if successful; -1 otherwise.
|
// Fetches a message. Returns 0 if successful; -1 otherwise.
|
||||||
// The caller is responsible for freeing the message when no
|
// The caller is responsible for freeing the message when no
|
||||||
// longer used.
|
// longer used.
|
||||||
int pull_msg (msg_t *msg_);
|
int pull_msg (msg_t *msg_);
|
||||||
|
|
||||||
|
// Receives message from ZAP socket.
|
||||||
|
// Returns 0 on success; -1 otherwise.
|
||||||
|
// The caller is responsible for freeing the message.
|
||||||
|
int read_zap_msg (msg_t *msg_);
|
||||||
|
|
||||||
|
// Sends message to ZAP socket.
|
||||||
|
// Returns 0 on success; -1 otherwise.
|
||||||
|
// The function takes ownership of the message.
|
||||||
|
int write_zap_msg (msg_t *msg_);
|
||||||
|
|
||||||
socket_base_t *get_socket ();
|
socket_base_t *get_socket ();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
@ -109,8 +121,11 @@ namespace zmq
|
|||||||
// Pipe connecting the session to its socket.
|
// Pipe connecting the session to its socket.
|
||||||
zmq::pipe_t *pipe;
|
zmq::pipe_t *pipe;
|
||||||
|
|
||||||
|
// Pipe used to exchange messages with ZAP socket.
|
||||||
|
zmq::pipe_t *zap_pipe;
|
||||||
|
|
||||||
// This set is added to with pipes we are disconnecting, but haven't yet completed
|
// This set is added to with pipes we are disconnecting, but haven't yet completed
|
||||||
std::set<pipe_t *> terminating_pipes;
|
std::set <pipe_t *> terminating_pipes;
|
||||||
|
|
||||||
// This flag is true if the remainder of the message being processed
|
// This flag is true if the remainder of the message being processed
|
||||||
// is still in the in pipe.
|
// is still in the in pipe.
|
||||||
|
@ -1003,10 +1003,10 @@ void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
|
|||||||
xhiccuped (pipe_);
|
xhiccuped (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::terminated (pipe_t *pipe_)
|
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Notify the specific socket type about the pipe termination.
|
// Notify the specific socket type about the pipe termination.
|
||||||
xterminated (pipe_);
|
xpipe_terminated (pipe_);
|
||||||
|
|
||||||
// Remove pipe from inproc pipes
|
// Remove pipe from inproc pipes
|
||||||
for (inprocs_t::iterator it = inprocs.begin(); it != inprocs.end(); ++it) {
|
for (inprocs_t::iterator it = inprocs.begin(); it != inprocs.end(); ++it) {
|
||||||
|
@ -100,7 +100,7 @@ namespace zmq
|
|||||||
void read_activated (pipe_t *pipe_);
|
void read_activated (pipe_t *pipe_);
|
||||||
void write_activated (pipe_t *pipe_);
|
void write_activated (pipe_t *pipe_);
|
||||||
void hiccuped (pipe_t *pipe_);
|
void hiccuped (pipe_t *pipe_);
|
||||||
void terminated (pipe_t *pipe_);
|
void pipe_terminated (pipe_t *pipe_);
|
||||||
void lock();
|
void lock();
|
||||||
void unlock();
|
void unlock();
|
||||||
|
|
||||||
@ -145,7 +145,7 @@ namespace zmq
|
|||||||
virtual void xread_activated (pipe_t *pipe_);
|
virtual void xread_activated (pipe_t *pipe_);
|
||||||
virtual void xwrite_activated (pipe_t *pipe_);
|
virtual void xwrite_activated (pipe_t *pipe_);
|
||||||
virtual void xhiccuped (pipe_t *pipe_);
|
virtual void xhiccuped (pipe_t *pipe_);
|
||||||
virtual void xterminated (pipe_t *pipe_) = 0;
|
virtual void xpipe_terminated (pipe_t *pipe_) = 0;
|
||||||
|
|
||||||
// Delay actual destruction of the socket.
|
// Delay actual destruction of the socket.
|
||||||
void process_destroy ();
|
void process_destroy ();
|
||||||
|
@ -70,7 +70,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
|
|||||||
read_msg (&stream_engine_t::read_identity),
|
read_msg (&stream_engine_t::read_identity),
|
||||||
write_msg (&stream_engine_t::write_identity),
|
write_msg (&stream_engine_t::write_identity),
|
||||||
io_error (false),
|
io_error (false),
|
||||||
congested (false),
|
|
||||||
subscription_required (false),
|
subscription_required (false),
|
||||||
mechanism (NULL),
|
mechanism (NULL),
|
||||||
input_paused (false),
|
input_paused (false),
|
||||||
@ -222,7 +221,7 @@ void zmq::stream_engine_t::in_event ()
|
|||||||
zmq_assert (decoder);
|
zmq_assert (decoder);
|
||||||
|
|
||||||
// If there has been an I/O error, stop polling.
|
// If there has been an I/O error, stop polling.
|
||||||
if (congested) {
|
if (input_paused) {
|
||||||
rm_fd (handle);
|
rm_fd (handle);
|
||||||
io_error = true;
|
io_error = true;
|
||||||
return;
|
return;
|
||||||
@ -270,7 +269,7 @@ void zmq::stream_engine_t::in_event ()
|
|||||||
error ();
|
error ();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
congested = true;
|
input_paused = true;
|
||||||
reset_pollin (handle);
|
reset_pollin (handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -309,6 +308,7 @@ void zmq::stream_engine_t::out_event ()
|
|||||||
|
|
||||||
// If there is no data to send, stop polling for output.
|
// If there is no data to send, stop polling for output.
|
||||||
if (outsize == 0) {
|
if (outsize == 0) {
|
||||||
|
output_paused = true;
|
||||||
reset_pollout (handle);
|
reset_pollout (handle);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -350,7 +350,10 @@ void zmq::stream_engine_t::activate_out ()
|
|||||||
if (unlikely (io_error))
|
if (unlikely (io_error))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if (likely (output_paused)) {
|
||||||
set_pollout (handle);
|
set_pollout (handle);
|
||||||
|
output_paused = false;
|
||||||
|
}
|
||||||
|
|
||||||
// Speculative write: The assumption is that at the moment new message
|
// Speculative write: The assumption is that at the moment new message
|
||||||
// was sent by the user the socket is probably available for writing.
|
// was sent by the user the socket is probably available for writing.
|
||||||
@ -361,7 +364,7 @@ void zmq::stream_engine_t::activate_out ()
|
|||||||
|
|
||||||
void zmq::stream_engine_t::activate_in ()
|
void zmq::stream_engine_t::activate_in ()
|
||||||
{
|
{
|
||||||
zmq_assert (congested);
|
zmq_assert (input_paused);
|
||||||
zmq_assert (session != NULL);
|
zmq_assert (session != NULL);
|
||||||
zmq_assert (decoder != NULL);
|
zmq_assert (decoder != NULL);
|
||||||
|
|
||||||
@ -393,7 +396,7 @@ void zmq::stream_engine_t::activate_in ()
|
|||||||
if (rc == -1 || io_error)
|
if (rc == -1 || io_error)
|
||||||
error ();
|
error ();
|
||||||
else {
|
else {
|
||||||
congested = false;
|
input_paused = false;
|
||||||
set_pollin (handle);
|
set_pollin (handle);
|
||||||
session->flush ();
|
session->flush ();
|
||||||
|
|
||||||
@ -533,7 +536,7 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
|
if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
|
||||||
mechanism = new (std::nothrow) plain_mechanism_t (options);
|
mechanism = new (std::nothrow) plain_mechanism_t (session, options);
|
||||||
alloc_assert (mechanism);
|
alloc_assert (mechanism);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@ -596,15 +599,8 @@ int zmq::stream_engine_t::next_handshake_message (msg_t *msg_)
|
|||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
if (mechanism->is_handshake_complete ())
|
if (mechanism->is_handshake_complete ())
|
||||||
mechanism_ready ();
|
mechanism_ready ();
|
||||||
if (input_paused) {
|
if (input_paused)
|
||||||
activate_in ();
|
activate_in ();
|
||||||
input_paused = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
if (rc == -1) {
|
|
||||||
zmq_assert (errno == EAGAIN);
|
|
||||||
output_paused = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
@ -618,18 +614,28 @@ int zmq::stream_engine_t::process_handshake_message (msg_t *msg_)
|
|||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
if (mechanism->is_handshake_complete ())
|
if (mechanism->is_handshake_complete ())
|
||||||
mechanism_ready ();
|
mechanism_ready ();
|
||||||
if (output_paused) {
|
if (output_paused)
|
||||||
activate_out ();
|
activate_out ();
|
||||||
output_paused = false;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
else
|
|
||||||
if (rc == -1 && errno == EAGAIN)
|
|
||||||
input_paused = true;
|
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void zmq::stream_engine_t::zap_msg_available ()
|
||||||
|
{
|
||||||
|
zmq_assert (mechanism != NULL);
|
||||||
|
|
||||||
|
const int rc = mechanism->zap_msg_available ();
|
||||||
|
if (rc == -1) {
|
||||||
|
error ();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (input_paused)
|
||||||
|
activate_in ();
|
||||||
|
if (output_paused)
|
||||||
|
activate_out ();
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::stream_engine_t::mechanism_ready ()
|
void zmq::stream_engine_t::mechanism_ready ()
|
||||||
{
|
{
|
||||||
if (options.recv_identity) {
|
if (options.recv_identity) {
|
||||||
|
@ -62,6 +62,7 @@ namespace zmq
|
|||||||
void terminate ();
|
void terminate ();
|
||||||
void activate_in ();
|
void activate_in ();
|
||||||
void activate_out ();
|
void activate_out ();
|
||||||
|
void zap_msg_available ();
|
||||||
|
|
||||||
// i_poll_events interface implementation.
|
// i_poll_events interface implementation.
|
||||||
void in_event ();
|
void in_event ();
|
||||||
@ -168,10 +169,6 @@ namespace zmq
|
|||||||
|
|
||||||
bool io_error;
|
bool io_error;
|
||||||
|
|
||||||
// True iff the session could not accept more
|
|
||||||
// messages due to flow control.
|
|
||||||
bool congested;
|
|
||||||
|
|
||||||
// Indicates whether the engine is to inject a phony
|
// Indicates whether the engine is to inject a phony
|
||||||
// subscription message into the incomming stream.
|
// subscription message into the incomming stream.
|
||||||
// Needed to support old peers.
|
// Needed to support old peers.
|
||||||
@ -179,7 +176,10 @@ namespace zmq
|
|||||||
|
|
||||||
mechanism_t *mechanism;
|
mechanism_t *mechanism;
|
||||||
|
|
||||||
|
// True iff the engine couldn't consume the last decoded message.
|
||||||
bool input_paused;
|
bool input_paused;
|
||||||
|
|
||||||
|
// True iff the engine doesn't have any message to encode.
|
||||||
bool output_paused;
|
bool output_paused;
|
||||||
|
|
||||||
// Socket
|
// Socket
|
||||||
|
@ -102,14 +102,14 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::xpub_t::xterminated (pipe_t *pipe_)
|
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Remove the pipe from the trie. If there are topics that nobody
|
// Remove the pipe from the trie. If there are topics that nobody
|
||||||
// is interested in anymore, send corresponding unsubscriptions
|
// is interested in anymore, send corresponding unsubscriptions
|
||||||
// upstream.
|
// upstream.
|
||||||
subscriptions.rm (pipe_, send_unsubscription, this);
|
subscriptions.rm (pipe_, send_unsubscription, this);
|
||||||
|
|
||||||
dist.terminated (pipe_);
|
dist.pipe_terminated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
|
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
|
||||||
|
@ -54,7 +54,7 @@ namespace zmq
|
|||||||
void xread_activated (zmq::pipe_t *pipe_);
|
void xread_activated (zmq::pipe_t *pipe_);
|
||||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||||
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||||
void xterminated (zmq::pipe_t *pipe_);
|
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
@ -67,10 +67,10 @@ void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
|
|||||||
dist.activated (pipe_);
|
dist.activated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::xsub_t::xterminated (pipe_t *pipe_)
|
void zmq::xsub_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
fq.terminated (pipe_);
|
fq.pipe_terminated (pipe_);
|
||||||
dist.terminated (pipe_);
|
dist.pipe_terminated (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
|
void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
|
||||||
|
@ -52,7 +52,7 @@ namespace zmq
|
|||||||
void xread_activated (zmq::pipe_t *pipe_);
|
void xread_activated (zmq::pipe_t *pipe_);
|
||||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||||
void xhiccuped (pipe_t *pipe_);
|
void xhiccuped (pipe_t *pipe_);
|
||||||
void xterminated (zmq::pipe_t *pipe_);
|
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
@ -17,8 +17,124 @@
|
|||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <string.h>
|
||||||
#include "testutil.hpp"
|
#include "testutil.hpp"
|
||||||
|
|
||||||
|
static bool
|
||||||
|
authenticate (const unsigned char *data, size_t data_length)
|
||||||
|
{
|
||||||
|
const char *username = "admin";
|
||||||
|
const size_t username_length = strlen (username);
|
||||||
|
const char *password = "password";
|
||||||
|
const size_t password_length = strlen (password);
|
||||||
|
|
||||||
|
if (data_length != 1 + username_length + 1 + password_length)
|
||||||
|
return false;
|
||||||
|
if (data [0] != username_length)
|
||||||
|
return false;
|
||||||
|
if (memcmp (data + 1, username, username_length))
|
||||||
|
return false;
|
||||||
|
if (data [1 + username_length] != password_length)
|
||||||
|
return false;
|
||||||
|
if (memcmp (data + 1 + username_length + 1, password, password_length))
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *
|
||||||
|
zap_handler (void *zap)
|
||||||
|
{
|
||||||
|
int rc, more;
|
||||||
|
size_t optlen;
|
||||||
|
zmq_msg_t version, seqno, domain, mechanism, credentials;
|
||||||
|
zmq_msg_t status_code, status_text, user_id;
|
||||||
|
|
||||||
|
// Version
|
||||||
|
rc = zmq_msg_init (&version);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_msg_recv (&version, zap, 0);
|
||||||
|
assert (rc == 3 && memcmp (zmq_msg_data (&version), "1.0", 3) == 0);
|
||||||
|
optlen = sizeof more;
|
||||||
|
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
|
||||||
|
assert (rc == 0 && more == 1);
|
||||||
|
|
||||||
|
// Sequence number
|
||||||
|
rc = zmq_msg_init (&seqno);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_msg_recv (&seqno, zap, 0);
|
||||||
|
assert (rc != -1);
|
||||||
|
optlen = sizeof more;
|
||||||
|
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
|
||||||
|
assert (rc == 0 && more == 1);
|
||||||
|
|
||||||
|
// Domain
|
||||||
|
rc = zmq_msg_init (&domain);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_msg_recv (&domain, zap, 0);
|
||||||
|
assert (rc != -1);
|
||||||
|
optlen = sizeof more;
|
||||||
|
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
|
||||||
|
assert (rc == 0 && more == 1);
|
||||||
|
|
||||||
|
// Mechanism
|
||||||
|
rc = zmq_msg_init (&mechanism);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_msg_recv (&mechanism, zap, 0);
|
||||||
|
assert (rc == 5 && memcmp (zmq_msg_data (&mechanism), "PLAIN", 5) == 0);
|
||||||
|
optlen = sizeof more;
|
||||||
|
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
|
||||||
|
assert (rc == 0 && more == 1);
|
||||||
|
|
||||||
|
// Credentials
|
||||||
|
rc = zmq_msg_init (&credentials);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_msg_recv (&credentials, zap, 0);
|
||||||
|
optlen = sizeof more;
|
||||||
|
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
|
||||||
|
assert (rc == 0 && more == 0);
|
||||||
|
|
||||||
|
const bool auth_ok =
|
||||||
|
authenticate ((unsigned char *) zmq_msg_data (&credentials),
|
||||||
|
zmq_msg_size (&credentials));
|
||||||
|
|
||||||
|
rc = zmq_msg_send (&version, zap, ZMQ_SNDMORE);
|
||||||
|
assert (rc == 3);
|
||||||
|
|
||||||
|
rc = zmq_msg_send (&seqno, zap, ZMQ_SNDMORE);
|
||||||
|
assert (rc != -1);
|
||||||
|
|
||||||
|
rc = zmq_msg_init_size (&status_code, 3);
|
||||||
|
assert (rc == 0);
|
||||||
|
memcpy (zmq_msg_data (&status_code), auth_ok? "200": "400", 3);
|
||||||
|
rc = zmq_msg_send (&status_code, zap, ZMQ_SNDMORE);
|
||||||
|
assert (rc == 3);
|
||||||
|
|
||||||
|
rc = zmq_msg_init (&status_text);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_msg_send (&status_text, zap, ZMQ_SNDMORE);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_msg_init (&user_id);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_msg_send (&user_id, zap, 0);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_msg_close (&domain);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_msg_close (&mechanism);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_msg_close (&credentials);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_close (zap);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int main (void)
|
int main (void)
|
||||||
{
|
{
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
@ -122,6 +238,18 @@ int main (void)
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (as_server == 1);
|
assert (as_server == 1);
|
||||||
|
|
||||||
|
// Create and bind ZAP socket
|
||||||
|
void *zap = zmq_socket (ctx, ZMQ_REP);
|
||||||
|
assert (zap);
|
||||||
|
|
||||||
|
rc = zmq_bind (zap, "inproc://zeromq.zap.01");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Spawn ZAP handler
|
||||||
|
pthread_t zap_thread;
|
||||||
|
rc = pthread_create (&zap_thread, NULL, &zap_handler, zap);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_bind (server, "tcp://*:9998");
|
rc = zmq_bind (server, "tcp://*:9998");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_connect (client, "tcp://localhost:9998");
|
rc = zmq_connect (client, "tcp://localhost:9998");
|
||||||
@ -134,6 +262,9 @@ int main (void)
|
|||||||
rc = zmq_close (server);
|
rc = zmq_close (server);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Wait until ZAP handler terminates.
|
||||||
|
pthread_join (zap_thread, NULL);
|
||||||
|
|
||||||
// Check PLAIN security -- two servers trying to talk to each other
|
// Check PLAIN security -- two servers trying to talk to each other
|
||||||
server = zmq_socket (ctx, ZMQ_DEALER);
|
server = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
assert (server);
|
assert (server);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user