diff --git a/src/dealer.cpp b/src/dealer.cpp index d1d5c94e..0d39225c 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -108,10 +108,10 @@ void zmq::dealer_t::xwrite_activated (pipe_t *pipe_) lb.activated (pipe_); } -void zmq::dealer_t::xterminated (pipe_t *pipe_) +void zmq::dealer_t::xpipe_terminated (pipe_t *pipe_) { - fq.terminated (pipe_); - lb.terminated (pipe_); + fq.pipe_terminated (pipe_); + lb.pipe_terminated (pipe_); } zmq::dealer_session_t::dealer_session_t (io_thread_t *io_thread_, bool connect_, diff --git a/src/dealer.hpp b/src/dealer.hpp index 361a9568..5f5f0d4f 100644 --- a/src/dealer.hpp +++ b/src/dealer.hpp @@ -53,7 +53,7 @@ namespace zmq bool xhas_out (); void xread_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_); - void xterminated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); private: diff --git a/src/dist.cpp b/src/dist.cpp index 2ba8f018..d4c47a72 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -74,7 +74,7 @@ void zmq::dist_t::unmatch () matching = 0; } -void zmq::dist_t::terminated (pipe_t *pipe_) +void zmq::dist_t::pipe_terminated (pipe_t *pipe_) { // Remove the pipe from the list; adjust number of matching, active and/or // eligible pipes accordingly. diff --git a/src/dist.hpp b/src/dist.hpp index 505045c0..6ad8b855 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -54,7 +54,7 @@ namespace zmq void unmatch (); // Removes the pipe from the distributor object. - void terminated (zmq::pipe_t *pipe_); + void pipe_terminated (zmq::pipe_t *pipe_); // Send the message to the matching outbound pipes. int send_to_matching (zmq::msg_t *msg_); diff --git a/src/fq.cpp b/src/fq.cpp index 04dbcc9c..aa099ebd 100644 --- a/src/fq.cpp +++ b/src/fq.cpp @@ -41,7 +41,7 @@ void zmq::fq_t::attach (pipe_t *pipe_) active++; } -void zmq::fq_t::terminated (pipe_t *pipe_) +void zmq::fq_t::pipe_terminated (pipe_t *pipe_) { const pipes_t::size_type index = pipes.index (pipe_); diff --git a/src/fq.hpp b/src/fq.hpp index 252159c6..843731ba 100644 --- a/src/fq.hpp +++ b/src/fq.hpp @@ -40,7 +40,7 @@ namespace zmq void attach (pipe_t *pipe_); void activated (pipe_t *pipe_); - void terminated (pipe_t *pipe_); + void pipe_terminated (pipe_t *pipe_); int recv (msg_t *msg_); int recvpipe (msg_t *msg_, pipe_t **pipe_); diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 8613e810..ba87913f 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -46,6 +46,8 @@ namespace zmq // This method is called by the session to signalise that there // are messages to send available. virtual void activate_out () = 0; + + virtual void zap_msg_available () = 0; }; } diff --git a/src/lb.cpp b/src/lb.cpp index 3e6f82fe..f02c4ad1 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -41,7 +41,7 @@ void zmq::lb_t::attach (pipe_t *pipe_) activated (pipe_); } -void zmq::lb_t::terminated (pipe_t *pipe_) +void zmq::lb_t::pipe_terminated (pipe_t *pipe_) { pipes_t::size_type index = pipes.index (pipe_); diff --git a/src/lb.hpp b/src/lb.hpp index 20b00828..2e40a745 100644 --- a/src/lb.hpp +++ b/src/lb.hpp @@ -38,7 +38,7 @@ namespace zmq void attach (pipe_t *pipe_); void activated (pipe_t *pipe_); - void terminated (pipe_t *pipe_); + void pipe_terminated (pipe_t *pipe_); int send (msg_t *msg_); bool has_out (); diff --git a/src/mechanism.hpp b/src/mechanism.hpp index 2b8ff272..1f6245ac 100644 --- a/src/mechanism.hpp +++ b/src/mechanism.hpp @@ -46,6 +46,9 @@ namespace zmq // Process the handshake message received from the peer. virtual int process_handshake_message (msg_t *msg_) = 0; + // Notifies mechanism about availability of ZAP message. + virtual int zap_msg_available () { return 0; } + // True iff the handshake stage is complete? virtual bool is_handshake_complete () const = 0; diff --git a/src/pair.cpp b/src/pair.cpp index c74d4f84..808cf4cf 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -49,7 +49,7 @@ void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) pipe_->terminate (false); } -void zmq::pair_t::xterminated (pipe_t *pipe_) +void zmq::pair_t::xpipe_terminated (pipe_t *pipe_) { if (pipe_ == pipe) pipe = NULL; diff --git a/src/pair.hpp b/src/pair.hpp index 99136af4..2bb6e330 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -47,7 +47,7 @@ namespace zmq bool xhas_out (); void xread_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_); - void xterminated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); private: diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index d6617860..e247bc12 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -59,6 +59,7 @@ namespace zmq void terminate (); void activate_in (); void activate_out (); + void zap_msg_available () {} // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 60dce6b8..981bc0de 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -58,6 +58,7 @@ namespace zmq void terminate (); void activate_in (); void activate_out (); + void zap_msg_available () {} // i_poll_events interface implementation. void in_event (); diff --git a/src/pipe.cpp b/src/pipe.cpp index 3b4c8f43..764a4744 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -285,7 +285,7 @@ void zmq::pipe_t::process_pipe_term_ack () { // Notify the user that all the references to the pipe should be dropped. zmq_assert (sink); - sink->terminated (this); + sink->pipe_terminated (this); // In term_ack_sent and term_req_sent2 states there's nothing to do. // Simply deallocate the pipe. In term_req_sent1 state we have to ack @@ -340,7 +340,7 @@ void zmq::pipe_t::terminate (bool delay_) // There are still pending messages available, but the user calls // 'terminate'. We can act as if all the pending messages were read. else - if (state == waiting_for_delimiter && delay == 0) { + if (state == waiting_for_delimiter && !delay) { outpipe = NULL; send_pipe_term_ack (peer); state = term_ack_sent; diff --git a/src/pipe.hpp b/src/pipe.hpp index be8ea420..b423d550 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -50,7 +50,7 @@ namespace zmq virtual void read_activated (zmq::pipe_t *pipe_) = 0; virtual void write_activated (zmq::pipe_t *pipe_) = 0; virtual void hiccuped (zmq::pipe_t *pipe_) = 0; - virtual void terminated (zmq::pipe_t *pipe_) = 0; + virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0; }; // Note that pipe can be stored in three different arrays. diff --git a/src/plain_mechanism.cpp b/src/plain_mechanism.cpp index 91e43c25..5084ca8a 100644 --- a/src/plain_mechanism.cpp +++ b/src/plain_mechanism.cpp @@ -26,12 +26,15 @@ #include #include "msg.hpp" +#include "session_base.hpp" #include "err.hpp" #include "plain_mechanism.hpp" #include "wire.hpp" -zmq::plain_mechanism_t::plain_mechanism_t (const options_t &options_) : +zmq::plain_mechanism_t::plain_mechanism_t (session_base_t *session_, + const options_t &options_) : mechanism_t (options_), + session (session_), state (options.as_server? waiting_for_hello: sending_hello) { } @@ -79,8 +82,16 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_) switch (state) { case waiting_for_hello: rc = process_hello_command (msg_); - if (rc == 0) - state = sending_welcome; + if (rc == 0) { + rc = receive_and_process_zap_reply (); + if (rc == 0) + state = sending_welcome; + else + if (errno == EAGAIN) { + rc = 0; + state = waiting_for_zap_reply; + } + } break; case waiting_for_welcome: rc = process_welcome_command (msg_); @@ -107,15 +118,25 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_) rc = msg_->init (); errno_assert (rc == 0); } - return 0; + return rc; } - bool zmq::plain_mechanism_t::is_handshake_complete () const { return state == ready; } +int zmq::plain_mechanism_t::zap_msg_available () +{ + if (state != waiting_for_zap_reply) { + errno = EFSM; + return -1; + } + const int rc = receive_and_process_zap_reply (); + if (rc == 0) + state = sending_welcome; + return rc; +} int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const { @@ -125,7 +146,7 @@ int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const const std::string password = options.plain_password; zmq_assert (password.length () < 256); - const size_t command_size = 8 + 1 + username.length () + const size_t command_size = 8 + 1 + username.length () + 1 + password.length (); const int rc = msg_->init_size (command_size); @@ -134,11 +155,11 @@ int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const unsigned char *ptr = static_cast (msg_->data ()); memcpy (ptr, "HELLO ", 8); ptr += 8; - + *ptr++ = static_cast (username.length ()); memcpy (ptr, username.c_str (), username.length ()); ptr += username.length (); - + *ptr++ = static_cast (password.length ()); memcpy (ptr, password.c_str (), password.length ()); ptr += password.length (); @@ -163,7 +184,7 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_) errno = EPROTO; return -1; } - size_t username_length = static_cast (*ptr++); + const size_t username_length = static_cast (*ptr++); bytes_left -= 1; if (bytes_left < username_length) { @@ -178,7 +199,7 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_) errno = EPROTO; return -1; } - size_t password_length = static_cast (*ptr++); + const size_t password_length = static_cast (*ptr++); bytes_left -= 1; if (bytes_left < password_length) { @@ -193,9 +214,66 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_) errno = EPROTO; return -1; } - - // TODO: Add user authentication - // Note: maybe use RFC 27 (ZAP) for this + + // Use ZAP protocol (RFC 27) to authenticate user. + int rc = session->zap_connect (); + if (rc == -1) { + errno = EPROTO; + return -1; + } + + msg_t msg; + + // Address delimiter frame + rc = msg.init (); + errno_assert (rc == 0); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Version frame + rc = msg.init_size (3); + errno_assert (rc == 0); + memcpy (msg.data (), "1.0", 3); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Sequence frame + rc = msg.init_size (1); + errno_assert (rc == 0); + memcpy (msg.data (), "1", 1); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Domain frame + rc = msg.init (); + errno_assert (rc == 0); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Mechanism frame + rc = msg.init_size (5); + errno_assert (rc == 0); + memcpy (msg.data (), "PLAIN", 5); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Credentials frame + rc = msg.init_size (1 + username_length + 1 + password_length); + errno_assert (rc == 0); + char *data_ptr = static_cast (msg.data ()); + *data_ptr++ = static_cast (username_length); + memcpy (data_ptr, username.c_str (), username_length); + data_ptr += username_length; + *data_ptr++ = static_cast (password_length); + memcpy (data_ptr, password.c_str (), password_length); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + return 0; } @@ -307,6 +385,65 @@ int zmq::plain_mechanism_t::process_ready_command (msg_t *msg_) return parse_property_list (ptr + 8, bytes_left - 8); } +int zmq::plain_mechanism_t::receive_and_process_zap_reply () +{ + int rc = 0; + msg_t msg [6]; + + for (int i = 0; i < 6; i++) { + rc = msg [i].init (); + errno_assert (rc == 0); + } + + for (int i = 0; i < 6; i++) { + rc = session->read_zap_msg (&msg [i]); + if (rc == -1) + break; + if ((msg [i].flags () & msg_t::more) == (i < 5? 0: msg_t::more)) { + errno = EPROTO; + rc = -1; + break; + } + } + + if (rc != 0) + goto error; + + return 0; + + // Address delimiter frame + if (msg [0].size () > 0) { + errno = EPROTO; + goto error; + } + + // Version frame + if (msg [1].size () != 3 || memcmp (msg [1].data (), "1.0", 3)) { + errno = EPROTO; + goto error; + } + + // Sequence number frame + if (msg [2].size () != 1 || memcmp (msg [2].data (), "1", 1)) { + errno = EPROTO; + goto error; + } + + // Status code frame + if (msg [3].size () != 3 || memcmp (msg [3].data (), "200", 3)) { + errno = EACCES; + goto error; + } + +error: + for (int i = 0; i < 6; i++) { + const int rc2 = msg [i].close (); + errno_assert (rc2 == 0); + } + + return rc; +} + int zmq::plain_mechanism_t::parse_property_list (const unsigned char *ptr, size_t bytes_left) { @@ -316,19 +453,19 @@ int zmq::plain_mechanism_t::parse_property_list (const unsigned char *ptr, bytes_left -= 1; if (bytes_left < name_length) break; - + const std::string name = std::string ((const char *) ptr, name_length); ptr += name_length; bytes_left -= name_length; if (bytes_left < 4) break; - + const size_t value_length = static_cast (get_uint32 (ptr)); ptr += 4; bytes_left -= 4; if (bytes_left < value_length) break; - + const unsigned char * const value = ptr; ptr += value_length; bytes_left -= value_length; diff --git a/src/plain_mechanism.hpp b/src/plain_mechanism.hpp index 18909903..5508959a 100644 --- a/src/plain_mechanism.hpp +++ b/src/plain_mechanism.hpp @@ -27,17 +27,20 @@ namespace zmq { class msg_t; + class session_base_t; class plain_mechanism_t : public mechanism_t { public: - plain_mechanism_t (const options_t &options_); + plain_mechanism_t (session_base_t *session_, + const options_t &options_); virtual ~plain_mechanism_t (); // mechanism implementation virtual int next_handshake_message (msg_t *msg_); virtual int process_handshake_message (msg_t *msg_); + virtual int zap_msg_available (); virtual bool is_handshake_complete () const; private: @@ -51,9 +54,11 @@ namespace zmq waiting_for_initiate, sending_ready, waiting_for_ready, + waiting_for_zap_reply, ready }; + session_base_t * const session; state_t state; int hello_command (msg_t *msg_) const; @@ -66,6 +71,8 @@ namespace zmq int process_ready_command (msg_t *msg_); int process_initiate_command (msg_t *msg_); + int receive_and_process_zap_reply (); + int parse_property_list (const unsigned char *ptr, size_t length); }; diff --git a/src/pull.cpp b/src/pull.cpp index 72401322..706553bb 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -46,9 +46,9 @@ void zmq::pull_t::xread_activated (pipe_t *pipe_) fq.activated (pipe_); } -void zmq::pull_t::xterminated (pipe_t *pipe_) +void zmq::pull_t::xpipe_terminated (pipe_t *pipe_) { - fq.terminated (pipe_); + fq.pipe_terminated (pipe_); } int zmq::pull_t::xrecv (msg_t *msg_) diff --git a/src/pull.hpp b/src/pull.hpp index 40eefd98..24e6b788 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -47,7 +47,7 @@ namespace zmq int xrecv (zmq::msg_t *msg_); bool xhas_in (); void xread_activated (zmq::pipe_t *pipe_); - void xterminated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); private: diff --git a/src/push.cpp b/src/push.cpp index 14688db8..3329b2fc 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -46,9 +46,9 @@ void zmq::push_t::xwrite_activated (pipe_t *pipe_) lb.activated (pipe_); } -void zmq::push_t::xterminated (pipe_t *pipe_) +void zmq::push_t::xpipe_terminated (pipe_t *pipe_) { - lb.terminated (pipe_); + lb.pipe_terminated (pipe_); } int zmq::push_t::xsend (msg_t *msg_) diff --git a/src/push.hpp b/src/push.hpp index cf4730a8..136cf9c7 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -47,7 +47,7 @@ namespace zmq int xsend (zmq::msg_t *msg_); bool xhas_out (); void xwrite_activated (zmq::pipe_t *pipe_); - void xterminated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); private: diff --git a/src/router.cpp b/src/router.cpp index 8d2e89f6..d5627f87 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -106,7 +106,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, } -void zmq::router_t::xterminated (pipe_t *pipe_) +void zmq::router_t::xpipe_terminated (pipe_t *pipe_) { std::set ::iterator it = anonymous_pipes.find (pipe_); if (it != anonymous_pipes.end ()) @@ -115,7 +115,7 @@ void zmq::router_t::xterminated (pipe_t *pipe_) outpipes_t::iterator it = outpipes.find (pipe_->get_identity ()); zmq_assert (it != outpipes.end ()); outpipes.erase (it); - fq.terminated (pipe_); + fq.pipe_terminated (pipe_); if (pipe_ == current_out) current_out = NULL; } diff --git a/src/router.hpp b/src/router.hpp index 98717714..a7e7ed9a 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -53,7 +53,7 @@ namespace zmq bool xhas_out (); void xread_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_); - void xterminated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); protected: diff --git a/src/session_base.cpp b/src/session_base.cpp index c096d740..df8573b4 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -28,6 +28,7 @@ #include "pgm_receiver.hpp" #include "address.hpp" +#include "ctx.hpp" #include "req.hpp" #include "dealer.hpp" #include "rep.hpp" @@ -105,6 +106,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, io_object_t (io_thread_), connect (connect_), pipe (NULL), + zap_pipe (NULL), incomplete_in (false), pending (false), engine (NULL), @@ -118,6 +120,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, zmq::session_base_t::~session_base_t () { zmq_assert (!pipe); + zmq_assert (!zap_pipe); // If there's still a pending linger timer, remove it. if (has_linger_timer) { @@ -165,6 +168,39 @@ int zmq::session_base_t::push_msg (msg_t *msg_) return -1; } +int zmq::session_base_t::read_zap_msg (msg_t *msg_) +{ + if (zap_pipe == NULL) { + errno = ENOTCONN; + return -1; + } + + if (!zap_pipe->read (msg_)) { + errno = EAGAIN; + return -1; + } + + return 0; +} + +int zmq::session_base_t::write_zap_msg (msg_t *msg_) +{ + if (zap_pipe == NULL) { + errno = ENOTCONN; + return -1; + } + + const bool ok = zap_pipe->write (msg_); + zmq_assert (ok); + + if ((msg_->flags () & msg_t::more) == 0) + zap_pipe->flush (); + + const int rc = msg_->init (); + errno_assert (rc == 0); + return 0; +} + void zmq::session_base_t::reset () { } @@ -197,14 +233,20 @@ void zmq::session_base_t::clean_pipes () } } -void zmq::session_base_t::terminated (pipe_t *pipe_) +void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) { // Drop the reference to the deallocated pipe if required. - zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1); + zmq_assert (pipe_ == pipe + || pipe_ == zap_pipe + || terminating_pipes.count (pipe_) == 1); - if (pipe == pipe_) + if (pipe_ == pipe) // If this is our current pipe, remove it pipe = NULL; + else + if (pipe_ == zap_pipe) { + zap_pipe = NULL; + } else // Remove the pipe from the detached pipes set terminating_pipes.erase (pipe_); @@ -217,25 +259,30 @@ void zmq::session_base_t::terminated (pipe_t *pipe_) terminate (); } - // If we are waiting for pending messages to be sent, at this point - // we are sure that there will be no more messages and we can proceed - // with termination safely. - if (pending && !pipe && terminating_pipes.empty ()) + // If we are waiting for pending messages to be sent, at this point + // we are sure that there will be no more messages and we can proceed + // with termination safely. + if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) proceed_with_term (); } void zmq::session_base_t::read_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (pipe != pipe_) { + if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) { zmq_assert (terminating_pipes.count (pipe_) == 1); return; } - if (likely (engine != NULL)) + if (unlikely (engine == NULL)) { + pipe->check_read (); + return; + } + + if (likely (pipe_ == pipe)) engine->activate_out (); else - pipe->check_read (); + engine->zap_msg_available (); } void zmq::session_base_t::write_activated (pipe_t *pipe_) @@ -268,6 +315,50 @@ void zmq::session_base_t::process_plug () start_connecting (false); } +int zmq::session_base_t::zap_connect () +{ + zmq_assert (zap_pipe == NULL); + + endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01"); + if (peer.socket == NULL) { + errno = ECONNREFUSED; + return -1; + } + if (peer.options.type != ZMQ_REP + && peer.options.type != ZMQ_ROUTER) { + errno = ECONNREFUSED; + return -1; + } + + // Create a bi-directional pipe that will connect + // session with zap socket. + object_t *parents [2] = {this, peer.socket}; + pipe_t *new_pipes [2] = {NULL, NULL}; + int hwms [2] = {0, 0}; + bool delays [2] = {false, false}; + int rc = pipepair (parents, new_pipes, hwms, delays); + errno_assert (rc == 0); + + // Attach local end of the pipe to this socket object. + zap_pipe = new_pipes [0]; + zap_pipe->set_event_sink (this); + + send_bind (peer.socket, new_pipes [1], false); + + // Send empty identity if required by the peer. + if (peer.options.recv_identity) { + msg_t id; + rc = id.init (); + errno_assert (rc == 0); + id.set_flags (msg_t::identity); + bool ok = zap_pipe->write (&id); + zmq_assert (ok); + zap_pipe->flush (); + } + + return 0; +} + void zmq::session_base_t::process_attach (i_engine *engine_) { zmq_assert (engine_ != NULL); @@ -312,6 +403,9 @@ void zmq::session_base_t::detach () // Just in case there's only a delimiter in the pipe. if (pipe) pipe->check_read (); + + if (zap_pipe) + zap_pipe->check_read (); } void zmq::session_base_t::process_term (int linger_) @@ -321,35 +415,40 @@ void zmq::session_base_t::process_term (int linger_) // If the termination of the pipe happens before the term command is // delivered there's nothing much to do. We can proceed with the // stadard termination immediately. - if (!pipe) { + if (!pipe && !zap_pipe) { proceed_with_term (); return; } pending = true; - // If there's finite linger value, delay the termination. - // If linger is infinite (negative) we don't even have to set - // the timer. - if (linger_ > 0) { - zmq_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; + if (pipe != NULL) { + // If there's finite linger value, delay the termination. + // If linger is infinite (negative) we don't even have to set + // the timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; + } + + // Start pipe termination process. Delay the termination till all messages + // are processed in case the linger time is non-zero. + pipe->terminate (linger_ != 0); + + // TODO: Should this go into pipe_t::terminate ? + // In case there's no engine and there's only delimiter in the + // pipe it wouldn't be ever read. Thus we check for it explicitly. + pipe->check_read (); } - // Start pipe termination process. Delay the termination till all messages - // are processed in case the linger time is non-zero. - pipe->terminate (linger_ != 0); - - // TODO: Should this go into pipe_t::terminate ? - // In case there's no engine and there's only delimiter in the - // pipe it wouldn't be ever read. Thus we check for it explicitly. - pipe->check_read (); + if (zap_pipe != NULL) + zap_pipe->terminate (false); } void zmq::session_base_t::proceed_with_term () { - // The pending phase have just ended. + // The pending phase has just ended. pending = false; // Continue with standard termination. diff --git a/src/session_base.hpp b/src/session_base.hpp index e907587e..2ef7dc50 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -61,17 +61,29 @@ namespace zmq void read_activated (zmq::pipe_t *pipe_); void write_activated (zmq::pipe_t *pipe_); void hiccuped (zmq::pipe_t *pipe_); - void terminated (zmq::pipe_t *pipe_); + void pipe_terminated (zmq::pipe_t *pipe_); // Delivers a message. Returns 0 if successful; -1 otherwise. // The function takes ownership of the message. int push_msg (msg_t *msg_); + int zap_connect (); + // Fetches a message. Returns 0 if successful; -1 otherwise. // The caller is responsible for freeing the message when no // longer used. int pull_msg (msg_t *msg_); + // Receives message from ZAP socket. + // Returns 0 on success; -1 otherwise. + // The caller is responsible for freeing the message. + int read_zap_msg (msg_t *msg_); + + // Sends message to ZAP socket. + // Returns 0 on success; -1 otherwise. + // The function takes ownership of the message. + int write_zap_msg (msg_t *msg_); + socket_base_t *get_socket (); protected: @@ -108,9 +120,12 @@ namespace zmq // Pipe connecting the session to its socket. zmq::pipe_t *pipe; - + + // Pipe used to exchange messages with ZAP socket. + zmq::pipe_t *zap_pipe; + // This set is added to with pipes we are disconnecting, but haven't yet completed - std::set terminating_pipes; + std::set terminating_pipes; // This flag is true if the remainder of the message being processed // is still in the in pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 52a5cf58..bcd47990 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1003,10 +1003,10 @@ void zmq::socket_base_t::hiccuped (pipe_t *pipe_) xhiccuped (pipe_); } -void zmq::socket_base_t::terminated (pipe_t *pipe_) +void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_) { // Notify the specific socket type about the pipe termination. - xterminated (pipe_); + xpipe_terminated (pipe_); // Remove pipe from inproc pipes for (inprocs_t::iterator it = inprocs.begin(); it != inprocs.end(); ++it) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 63e43ea2..603ab7a8 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -100,7 +100,7 @@ namespace zmq void read_activated (pipe_t *pipe_); void write_activated (pipe_t *pipe_); void hiccuped (pipe_t *pipe_); - void terminated (pipe_t *pipe_); + void pipe_terminated (pipe_t *pipe_); void lock(); void unlock(); @@ -145,7 +145,7 @@ namespace zmq virtual void xread_activated (pipe_t *pipe_); virtual void xwrite_activated (pipe_t *pipe_); virtual void xhiccuped (pipe_t *pipe_); - virtual void xterminated (pipe_t *pipe_) = 0; + virtual void xpipe_terminated (pipe_t *pipe_) = 0; // Delay actual destruction of the socket. void process_destroy (); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 82d2b299..61533897 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -70,7 +70,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, read_msg (&stream_engine_t::read_identity), write_msg (&stream_engine_t::write_identity), io_error (false), - congested (false), subscription_required (false), mechanism (NULL), input_paused (false), @@ -222,7 +221,7 @@ void zmq::stream_engine_t::in_event () zmq_assert (decoder); // If there has been an I/O error, stop polling. - if (congested) { + if (input_paused) { rm_fd (handle); io_error = true; return; @@ -270,7 +269,7 @@ void zmq::stream_engine_t::in_event () error (); return; } - congested = true; + input_paused = true; reset_pollin (handle); } @@ -309,6 +308,7 @@ void zmq::stream_engine_t::out_event () // If there is no data to send, stop polling for output. if (outsize == 0) { + output_paused = true; reset_pollout (handle); return; } @@ -350,7 +350,10 @@ void zmq::stream_engine_t::activate_out () if (unlikely (io_error)) return; - set_pollout (handle); + if (likely (output_paused)) { + set_pollout (handle); + output_paused = false; + } // Speculative write: The assumption is that at the moment new message // was sent by the user the socket is probably available for writing. @@ -361,7 +364,7 @@ void zmq::stream_engine_t::activate_out () void zmq::stream_engine_t::activate_in () { - zmq_assert (congested); + zmq_assert (input_paused); zmq_assert (session != NULL); zmq_assert (decoder != NULL); @@ -393,7 +396,7 @@ void zmq::stream_engine_t::activate_in () if (rc == -1 || io_error) error (); else { - congested = false; + input_paused = false; set_pollin (handle); session->flush (); @@ -533,7 +536,7 @@ bool zmq::stream_engine_t::handshake () } else if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { - mechanism = new (std::nothrow) plain_mechanism_t (options); + mechanism = new (std::nothrow) plain_mechanism_t (session, options); alloc_assert (mechanism); } else { @@ -596,15 +599,8 @@ int zmq::stream_engine_t::next_handshake_message (msg_t *msg_) if (rc == 0) { if (mechanism->is_handshake_complete ()) mechanism_ready (); - if (input_paused) { + if (input_paused) activate_in (); - input_paused = false; - } - } - else - if (rc == -1) { - zmq_assert (errno == EAGAIN); - output_paused = true; } return rc; @@ -618,18 +614,28 @@ int zmq::stream_engine_t::process_handshake_message (msg_t *msg_) if (rc == 0) { if (mechanism->is_handshake_complete ()) mechanism_ready (); - if (output_paused) { + if (output_paused) activate_out (); - output_paused = false; - } } - else - if (rc == -1 && errno == EAGAIN) - input_paused = true; return rc; } +void zmq::stream_engine_t::zap_msg_available () +{ + zmq_assert (mechanism != NULL); + + const int rc = mechanism->zap_msg_available (); + if (rc == -1) { + error (); + return; + } + if (input_paused) + activate_in (); + if (output_paused) + activate_out (); +} + void zmq::stream_engine_t::mechanism_ready () { if (options.recv_identity) { diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index ee5e966d..0ef0ffd4 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -62,6 +62,7 @@ namespace zmq void terminate (); void activate_in (); void activate_out (); + void zap_msg_available (); // i_poll_events interface implementation. void in_event (); @@ -168,10 +169,6 @@ namespace zmq bool io_error; - // True iff the session could not accept more - // messages due to flow control. - bool congested; - // Indicates whether the engine is to inject a phony // subscription message into the incomming stream. // Needed to support old peers. @@ -179,7 +176,10 @@ namespace zmq mechanism_t *mechanism; + // True iff the engine couldn't consume the last decoded message. bool input_paused; + + // True iff the engine doesn't have any message to encode. bool output_paused; // Socket diff --git a/src/xpub.cpp b/src/xpub.cpp index ad98beb2..d3aee48e 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -102,14 +102,14 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, return 0; } -void zmq::xpub_t::xterminated (pipe_t *pipe_) +void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) { // Remove the pipe from the trie. If there are topics that nobody // is interested in anymore, send corresponding unsubscriptions // upstream. subscriptions.rm (pipe_, send_unsubscription, this); - dist.terminated (pipe_); + dist.pipe_terminated (pipe_); } void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) diff --git a/src/xpub.hpp b/src/xpub.hpp index 457630ad..86f08f3a 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -54,7 +54,7 @@ namespace zmq void xread_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - void xterminated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); private: diff --git a/src/xsub.cpp b/src/xsub.cpp index d30efbe8..e5e11f5b 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -67,10 +67,10 @@ void zmq::xsub_t::xwrite_activated (pipe_t *pipe_) dist.activated (pipe_); } -void zmq::xsub_t::xterminated (pipe_t *pipe_) +void zmq::xsub_t::xpipe_terminated (pipe_t *pipe_) { - fq.terminated (pipe_); - dist.terminated (pipe_); + fq.pipe_terminated (pipe_); + dist.pipe_terminated (pipe_); } void zmq::xsub_t::xhiccuped (pipe_t *pipe_) diff --git a/src/xsub.hpp b/src/xsub.hpp index 440340e4..929299b5 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -52,7 +52,7 @@ namespace zmq void xread_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_); void xhiccuped (pipe_t *pipe_); - void xterminated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); private: diff --git a/tests/test_security.cpp b/tests/test_security.cpp index 40e86cde..f3e4e805 100644 --- a/tests/test_security.cpp +++ b/tests/test_security.cpp @@ -17,8 +17,124 @@ along with this program. If not, see . */ +#include +#include #include "testutil.hpp" +static bool +authenticate (const unsigned char *data, size_t data_length) +{ + const char *username = "admin"; + const size_t username_length = strlen (username); + const char *password = "password"; + const size_t password_length = strlen (password); + + if (data_length != 1 + username_length + 1 + password_length) + return false; + if (data [0] != username_length) + return false; + if (memcmp (data + 1, username, username_length)) + return false; + if (data [1 + username_length] != password_length) + return false; + if (memcmp (data + 1 + username_length + 1, password, password_length)) + return false; + return true; +} + +static void * +zap_handler (void *zap) +{ + int rc, more; + size_t optlen; + zmq_msg_t version, seqno, domain, mechanism, credentials; + zmq_msg_t status_code, status_text, user_id; + + // Version + rc = zmq_msg_init (&version); + assert (rc == 0); + rc = zmq_msg_recv (&version, zap, 0); + assert (rc == 3 && memcmp (zmq_msg_data (&version), "1.0", 3) == 0); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 1); + + // Sequence number + rc = zmq_msg_init (&seqno); + assert (rc == 0); + rc = zmq_msg_recv (&seqno, zap, 0); + assert (rc != -1); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 1); + + // Domain + rc = zmq_msg_init (&domain); + assert (rc == 0); + rc = zmq_msg_recv (&domain, zap, 0); + assert (rc != -1); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 1); + + // Mechanism + rc = zmq_msg_init (&mechanism); + assert (rc == 0); + rc = zmq_msg_recv (&mechanism, zap, 0); + assert (rc == 5 && memcmp (zmq_msg_data (&mechanism), "PLAIN", 5) == 0); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 1); + + // Credentials + rc = zmq_msg_init (&credentials); + assert (rc == 0); + rc = zmq_msg_recv (&credentials, zap, 0); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 0); + + const bool auth_ok = + authenticate ((unsigned char *) zmq_msg_data (&credentials), + zmq_msg_size (&credentials)); + + rc = zmq_msg_send (&version, zap, ZMQ_SNDMORE); + assert (rc == 3); + + rc = zmq_msg_send (&seqno, zap, ZMQ_SNDMORE); + assert (rc != -1); + + rc = zmq_msg_init_size (&status_code, 3); + assert (rc == 0); + memcpy (zmq_msg_data (&status_code), auth_ok? "200": "400", 3); + rc = zmq_msg_send (&status_code, zap, ZMQ_SNDMORE); + assert (rc == 3); + + rc = zmq_msg_init (&status_text); + assert (rc == 0); + rc = zmq_msg_send (&status_text, zap, ZMQ_SNDMORE); + assert (rc == 0); + + rc = zmq_msg_init (&user_id); + assert (rc == 0); + rc = zmq_msg_send (&user_id, zap, 0); + assert (rc == 0); + + rc = zmq_msg_close (&domain); + assert (rc == 0); + + rc = zmq_msg_close (&mechanism); + assert (rc == 0); + + rc = zmq_msg_close (&credentials); + assert (rc == 0); + + rc = zmq_close (zap); + assert (rc == 0); + + return NULL; +} + int main (void) { void *ctx = zmq_ctx_new (); @@ -122,6 +238,18 @@ int main (void) assert (rc == 0); assert (as_server == 1); + // Create and bind ZAP socket + void *zap = zmq_socket (ctx, ZMQ_REP); + assert (zap); + + rc = zmq_bind (zap, "inproc://zeromq.zap.01"); + assert (rc == 0); + + // Spawn ZAP handler + pthread_t zap_thread; + rc = pthread_create (&zap_thread, NULL, &zap_handler, zap); + assert (rc == 0); + rc = zmq_bind (server, "tcp://*:9998"); assert (rc == 0); rc = zmq_connect (client, "tcp://localhost:9998"); @@ -133,6 +261,9 @@ int main (void) assert (rc == 0); rc = zmq_close (server); assert (rc == 0); + + // Wait until ZAP handler terminates. + pthread_join (zap_thread, NULL); // Check PLAIN security -- two servers trying to talk to each other server = zmq_socket (ctx, ZMQ_DEALER);