diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 8613e810..ba87913f 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -46,6 +46,8 @@ namespace zmq // This method is called by the session to signalise that there // are messages to send available. virtual void activate_out () = 0; + + virtual void zap_msg_available () = 0; }; } diff --git a/src/mechanism.hpp b/src/mechanism.hpp index 2b8ff272..1f6245ac 100644 --- a/src/mechanism.hpp +++ b/src/mechanism.hpp @@ -46,6 +46,9 @@ namespace zmq // Process the handshake message received from the peer. virtual int process_handshake_message (msg_t *msg_) = 0; + // Notifies mechanism about availability of ZAP message. + virtual int zap_msg_available () { return 0; } + // True iff the handshake stage is complete? virtual bool is_handshake_complete () const = 0; diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index d6617860..e247bc12 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -59,6 +59,7 @@ namespace zmq void terminate (); void activate_in (); void activate_out (); + void zap_msg_available () {} // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 60dce6b8..981bc0de 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -58,6 +58,7 @@ namespace zmq void terminate (); void activate_in (); void activate_out (); + void zap_msg_available () {} // i_poll_events interface implementation. void in_event (); diff --git a/src/plain_mechanism.cpp b/src/plain_mechanism.cpp index f1aab761..5084ca8a 100644 --- a/src/plain_mechanism.cpp +++ b/src/plain_mechanism.cpp @@ -26,12 +26,15 @@ #include #include "msg.hpp" +#include "session_base.hpp" #include "err.hpp" #include "plain_mechanism.hpp" #include "wire.hpp" -zmq::plain_mechanism_t::plain_mechanism_t (const options_t &options_) : +zmq::plain_mechanism_t::plain_mechanism_t (session_base_t *session_, + const options_t &options_) : mechanism_t (options_), + session (session_), state (options.as_server? waiting_for_hello: sending_hello) { } @@ -79,8 +82,16 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_) switch (state) { case waiting_for_hello: rc = process_hello_command (msg_); - if (rc == 0) - state = sending_welcome; + if (rc == 0) { + rc = receive_and_process_zap_reply (); + if (rc == 0) + state = sending_welcome; + else + if (errno == EAGAIN) { + rc = 0; + state = waiting_for_zap_reply; + } + } break; case waiting_for_welcome: rc = process_welcome_command (msg_); @@ -107,7 +118,7 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_) rc = msg_->init (); errno_assert (rc == 0); } - return 0; + return rc; } bool zmq::plain_mechanism_t::is_handshake_complete () const @@ -115,6 +126,17 @@ bool zmq::plain_mechanism_t::is_handshake_complete () const return state == ready; } +int zmq::plain_mechanism_t::zap_msg_available () +{ + if (state != waiting_for_zap_reply) { + errno = EFSM; + return -1; + } + const int rc = receive_and_process_zap_reply (); + if (rc == 0) + state = sending_welcome; + return rc; +} int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const { @@ -192,9 +214,66 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_) errno = EPROTO; return -1; } - - // TODO: Add user authentication - // Note: maybe use RFC 27 (ZAP) for this + + // Use ZAP protocol (RFC 27) to authenticate user. + int rc = session->zap_connect (); + if (rc == -1) { + errno = EPROTO; + return -1; + } + + msg_t msg; + + // Address delimiter frame + rc = msg.init (); + errno_assert (rc == 0); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Version frame + rc = msg.init_size (3); + errno_assert (rc == 0); + memcpy (msg.data (), "1.0", 3); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Sequence frame + rc = msg.init_size (1); + errno_assert (rc == 0); + memcpy (msg.data (), "1", 1); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Domain frame + rc = msg.init (); + errno_assert (rc == 0); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Mechanism frame + rc = msg.init_size (5); + errno_assert (rc == 0); + memcpy (msg.data (), "PLAIN", 5); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Credentials frame + rc = msg.init_size (1 + username_length + 1 + password_length); + errno_assert (rc == 0); + char *data_ptr = static_cast (msg.data ()); + *data_ptr++ = static_cast (username_length); + memcpy (data_ptr, username.c_str (), username_length); + data_ptr += username_length; + *data_ptr++ = static_cast (password_length); + memcpy (data_ptr, password.c_str (), password_length); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + return 0; } @@ -306,6 +385,65 @@ int zmq::plain_mechanism_t::process_ready_command (msg_t *msg_) return parse_property_list (ptr + 8, bytes_left - 8); } +int zmq::plain_mechanism_t::receive_and_process_zap_reply () +{ + int rc = 0; + msg_t msg [6]; + + for (int i = 0; i < 6; i++) { + rc = msg [i].init (); + errno_assert (rc == 0); + } + + for (int i = 0; i < 6; i++) { + rc = session->read_zap_msg (&msg [i]); + if (rc == -1) + break; + if ((msg [i].flags () & msg_t::more) == (i < 5? 0: msg_t::more)) { + errno = EPROTO; + rc = -1; + break; + } + } + + if (rc != 0) + goto error; + + return 0; + + // Address delimiter frame + if (msg [0].size () > 0) { + errno = EPROTO; + goto error; + } + + // Version frame + if (msg [1].size () != 3 || memcmp (msg [1].data (), "1.0", 3)) { + errno = EPROTO; + goto error; + } + + // Sequence number frame + if (msg [2].size () != 1 || memcmp (msg [2].data (), "1", 1)) { + errno = EPROTO; + goto error; + } + + // Status code frame + if (msg [3].size () != 3 || memcmp (msg [3].data (), "200", 3)) { + errno = EACCES; + goto error; + } + +error: + for (int i = 0; i < 6; i++) { + const int rc2 = msg [i].close (); + errno_assert (rc2 == 0); + } + + return rc; +} + int zmq::plain_mechanism_t::parse_property_list (const unsigned char *ptr, size_t bytes_left) { diff --git a/src/plain_mechanism.hpp b/src/plain_mechanism.hpp index 18909903..5508959a 100644 --- a/src/plain_mechanism.hpp +++ b/src/plain_mechanism.hpp @@ -27,17 +27,20 @@ namespace zmq { class msg_t; + class session_base_t; class plain_mechanism_t : public mechanism_t { public: - plain_mechanism_t (const options_t &options_); + plain_mechanism_t (session_base_t *session_, + const options_t &options_); virtual ~plain_mechanism_t (); // mechanism implementation virtual int next_handshake_message (msg_t *msg_); virtual int process_handshake_message (msg_t *msg_); + virtual int zap_msg_available (); virtual bool is_handshake_complete () const; private: @@ -51,9 +54,11 @@ namespace zmq waiting_for_initiate, sending_ready, waiting_for_ready, + waiting_for_zap_reply, ready }; + session_base_t * const session; state_t state; int hello_command (msg_t *msg_) const; @@ -66,6 +71,8 @@ namespace zmq int process_ready_command (msg_t *msg_); int process_initiate_command (msg_t *msg_); + int receive_and_process_zap_reply (); + int parse_property_list (const unsigned char *ptr, size_t length); }; diff --git a/src/session_base.cpp b/src/session_base.cpp index 3baa93e4..df8573b4 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -28,6 +28,7 @@ #include "pgm_receiver.hpp" #include "address.hpp" +#include "ctx.hpp" #include "req.hpp" #include "dealer.hpp" #include "rep.hpp" @@ -105,6 +106,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, io_object_t (io_thread_), connect (connect_), pipe (NULL), + zap_pipe (NULL), incomplete_in (false), pending (false), engine (NULL), @@ -118,6 +120,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, zmq::session_base_t::~session_base_t () { zmq_assert (!pipe); + zmq_assert (!zap_pipe); // If there's still a pending linger timer, remove it. if (has_linger_timer) { @@ -165,6 +168,39 @@ int zmq::session_base_t::push_msg (msg_t *msg_) return -1; } +int zmq::session_base_t::read_zap_msg (msg_t *msg_) +{ + if (zap_pipe == NULL) { + errno = ENOTCONN; + return -1; + } + + if (!zap_pipe->read (msg_)) { + errno = EAGAIN; + return -1; + } + + return 0; +} + +int zmq::session_base_t::write_zap_msg (msg_t *msg_) +{ + if (zap_pipe == NULL) { + errno = ENOTCONN; + return -1; + } + + const bool ok = zap_pipe->write (msg_); + zmq_assert (ok); + + if ((msg_->flags () & msg_t::more) == 0) + zap_pipe->flush (); + + const int rc = msg_->init (); + errno_assert (rc == 0); + return 0; +} + void zmq::session_base_t::reset () { } @@ -200,11 +236,17 @@ void zmq::session_base_t::clean_pipes () void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) { // Drop the reference to the deallocated pipe if required. - zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1); + zmq_assert (pipe_ == pipe + || pipe_ == zap_pipe + || terminating_pipes.count (pipe_) == 1); - if (pipe == pipe_) + if (pipe_ == pipe) // If this is our current pipe, remove it pipe = NULL; + else + if (pipe_ == zap_pipe) { + zap_pipe = NULL; + } else // Remove the pipe from the detached pipes set terminating_pipes.erase (pipe_); @@ -220,22 +262,27 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) // If we are waiting for pending messages to be sent, at this point // we are sure that there will be no more messages and we can proceed // with termination safely. - if (pending && !pipe && terminating_pipes.empty ()) + if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) proceed_with_term (); } void zmq::session_base_t::read_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (pipe != pipe_) { + if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) { zmq_assert (terminating_pipes.count (pipe_) == 1); return; } - if (likely (engine != NULL)) + if (unlikely (engine == NULL)) { + pipe->check_read (); + return; + } + + if (likely (pipe_ == pipe)) engine->activate_out (); else - pipe->check_read (); + engine->zap_msg_available (); } void zmq::session_base_t::write_activated (pipe_t *pipe_) @@ -268,6 +315,50 @@ void zmq::session_base_t::process_plug () start_connecting (false); } +int zmq::session_base_t::zap_connect () +{ + zmq_assert (zap_pipe == NULL); + + endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01"); + if (peer.socket == NULL) { + errno = ECONNREFUSED; + return -1; + } + if (peer.options.type != ZMQ_REP + && peer.options.type != ZMQ_ROUTER) { + errno = ECONNREFUSED; + return -1; + } + + // Create a bi-directional pipe that will connect + // session with zap socket. + object_t *parents [2] = {this, peer.socket}; + pipe_t *new_pipes [2] = {NULL, NULL}; + int hwms [2] = {0, 0}; + bool delays [2] = {false, false}; + int rc = pipepair (parents, new_pipes, hwms, delays); + errno_assert (rc == 0); + + // Attach local end of the pipe to this socket object. + zap_pipe = new_pipes [0]; + zap_pipe->set_event_sink (this); + + send_bind (peer.socket, new_pipes [1], false); + + // Send empty identity if required by the peer. + if (peer.options.recv_identity) { + msg_t id; + rc = id.init (); + errno_assert (rc == 0); + id.set_flags (msg_t::identity); + bool ok = zap_pipe->write (&id); + zmq_assert (ok); + zap_pipe->flush (); + } + + return 0; +} + void zmq::session_base_t::process_attach (i_engine *engine_) { zmq_assert (engine_ != NULL); @@ -312,6 +403,9 @@ void zmq::session_base_t::detach () // Just in case there's only a delimiter in the pipe. if (pipe) pipe->check_read (); + + if (zap_pipe) + zap_pipe->check_read (); } void zmq::session_base_t::process_term (int linger_) @@ -321,30 +415,35 @@ void zmq::session_base_t::process_term (int linger_) // If the termination of the pipe happens before the term command is // delivered there's nothing much to do. We can proceed with the // stadard termination immediately. - if (!pipe) { + if (!pipe && !zap_pipe) { proceed_with_term (); return; } pending = true; - // If there's finite linger value, delay the termination. - // If linger is infinite (negative) we don't even have to set - // the timer. - if (linger_ > 0) { - zmq_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; + if (pipe != NULL) { + // If there's finite linger value, delay the termination. + // If linger is infinite (negative) we don't even have to set + // the timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; + } + + // Start pipe termination process. Delay the termination till all messages + // are processed in case the linger time is non-zero. + pipe->terminate (linger_ != 0); + + // TODO: Should this go into pipe_t::terminate ? + // In case there's no engine and there's only delimiter in the + // pipe it wouldn't be ever read. Thus we check for it explicitly. + pipe->check_read (); } - // Start pipe termination process. Delay the termination till all messages - // are processed in case the linger time is non-zero. - pipe->terminate (linger_ != 0); - - // TODO: Should this go into pipe_t::terminate ? - // In case there's no engine and there's only delimiter in the - // pipe it wouldn't be ever read. Thus we check for it explicitly. - pipe->check_read (); + if (zap_pipe != NULL) + zap_pipe->terminate (false); } void zmq::session_base_t::proceed_with_term () diff --git a/src/session_base.hpp b/src/session_base.hpp index 9f2a445e..2ef7dc50 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -67,11 +67,23 @@ namespace zmq // The function takes ownership of the message. int push_msg (msg_t *msg_); + int zap_connect (); + // Fetches a message. Returns 0 if successful; -1 otherwise. // The caller is responsible for freeing the message when no // longer used. int pull_msg (msg_t *msg_); + // Receives message from ZAP socket. + // Returns 0 on success; -1 otherwise. + // The caller is responsible for freeing the message. + int read_zap_msg (msg_t *msg_); + + // Sends message to ZAP socket. + // Returns 0 on success; -1 otherwise. + // The function takes ownership of the message. + int write_zap_msg (msg_t *msg_); + socket_base_t *get_socket (); protected: @@ -109,6 +121,9 @@ namespace zmq // Pipe connecting the session to its socket. zmq::pipe_t *pipe; + // Pipe used to exchange messages with ZAP socket. + zmq::pipe_t *zap_pipe; + // This set is added to with pipes we are disconnecting, but haven't yet completed std::set terminating_pipes; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 82d2b299..61533897 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -70,7 +70,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, read_msg (&stream_engine_t::read_identity), write_msg (&stream_engine_t::write_identity), io_error (false), - congested (false), subscription_required (false), mechanism (NULL), input_paused (false), @@ -222,7 +221,7 @@ void zmq::stream_engine_t::in_event () zmq_assert (decoder); // If there has been an I/O error, stop polling. - if (congested) { + if (input_paused) { rm_fd (handle); io_error = true; return; @@ -270,7 +269,7 @@ void zmq::stream_engine_t::in_event () error (); return; } - congested = true; + input_paused = true; reset_pollin (handle); } @@ -309,6 +308,7 @@ void zmq::stream_engine_t::out_event () // If there is no data to send, stop polling for output. if (outsize == 0) { + output_paused = true; reset_pollout (handle); return; } @@ -350,7 +350,10 @@ void zmq::stream_engine_t::activate_out () if (unlikely (io_error)) return; - set_pollout (handle); + if (likely (output_paused)) { + set_pollout (handle); + output_paused = false; + } // Speculative write: The assumption is that at the moment new message // was sent by the user the socket is probably available for writing. @@ -361,7 +364,7 @@ void zmq::stream_engine_t::activate_out () void zmq::stream_engine_t::activate_in () { - zmq_assert (congested); + zmq_assert (input_paused); zmq_assert (session != NULL); zmq_assert (decoder != NULL); @@ -393,7 +396,7 @@ void zmq::stream_engine_t::activate_in () if (rc == -1 || io_error) error (); else { - congested = false; + input_paused = false; set_pollin (handle); session->flush (); @@ -533,7 +536,7 @@ bool zmq::stream_engine_t::handshake () } else if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { - mechanism = new (std::nothrow) plain_mechanism_t (options); + mechanism = new (std::nothrow) plain_mechanism_t (session, options); alloc_assert (mechanism); } else { @@ -596,15 +599,8 @@ int zmq::stream_engine_t::next_handshake_message (msg_t *msg_) if (rc == 0) { if (mechanism->is_handshake_complete ()) mechanism_ready (); - if (input_paused) { + if (input_paused) activate_in (); - input_paused = false; - } - } - else - if (rc == -1) { - zmq_assert (errno == EAGAIN); - output_paused = true; } return rc; @@ -618,18 +614,28 @@ int zmq::stream_engine_t::process_handshake_message (msg_t *msg_) if (rc == 0) { if (mechanism->is_handshake_complete ()) mechanism_ready (); - if (output_paused) { + if (output_paused) activate_out (); - output_paused = false; - } } - else - if (rc == -1 && errno == EAGAIN) - input_paused = true; return rc; } +void zmq::stream_engine_t::zap_msg_available () +{ + zmq_assert (mechanism != NULL); + + const int rc = mechanism->zap_msg_available (); + if (rc == -1) { + error (); + return; + } + if (input_paused) + activate_in (); + if (output_paused) + activate_out (); +} + void zmq::stream_engine_t::mechanism_ready () { if (options.recv_identity) { diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index ee5e966d..0ef0ffd4 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -62,6 +62,7 @@ namespace zmq void terminate (); void activate_in (); void activate_out (); + void zap_msg_available (); // i_poll_events interface implementation. void in_event (); @@ -168,10 +169,6 @@ namespace zmq bool io_error; - // True iff the session could not accept more - // messages due to flow control. - bool congested; - // Indicates whether the engine is to inject a phony // subscription message into the incomming stream. // Needed to support old peers. @@ -179,7 +176,10 @@ namespace zmq mechanism_t *mechanism; + // True iff the engine couldn't consume the last decoded message. bool input_paused; + + // True iff the engine doesn't have any message to encode. bool output_paused; // Socket diff --git a/tests/test_security.cpp b/tests/test_security.cpp index 40e86cde..f3e4e805 100644 --- a/tests/test_security.cpp +++ b/tests/test_security.cpp @@ -17,8 +17,124 @@ along with this program. If not, see . */ +#include +#include #include "testutil.hpp" +static bool +authenticate (const unsigned char *data, size_t data_length) +{ + const char *username = "admin"; + const size_t username_length = strlen (username); + const char *password = "password"; + const size_t password_length = strlen (password); + + if (data_length != 1 + username_length + 1 + password_length) + return false; + if (data [0] != username_length) + return false; + if (memcmp (data + 1, username, username_length)) + return false; + if (data [1 + username_length] != password_length) + return false; + if (memcmp (data + 1 + username_length + 1, password, password_length)) + return false; + return true; +} + +static void * +zap_handler (void *zap) +{ + int rc, more; + size_t optlen; + zmq_msg_t version, seqno, domain, mechanism, credentials; + zmq_msg_t status_code, status_text, user_id; + + // Version + rc = zmq_msg_init (&version); + assert (rc == 0); + rc = zmq_msg_recv (&version, zap, 0); + assert (rc == 3 && memcmp (zmq_msg_data (&version), "1.0", 3) == 0); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 1); + + // Sequence number + rc = zmq_msg_init (&seqno); + assert (rc == 0); + rc = zmq_msg_recv (&seqno, zap, 0); + assert (rc != -1); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 1); + + // Domain + rc = zmq_msg_init (&domain); + assert (rc == 0); + rc = zmq_msg_recv (&domain, zap, 0); + assert (rc != -1); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 1); + + // Mechanism + rc = zmq_msg_init (&mechanism); + assert (rc == 0); + rc = zmq_msg_recv (&mechanism, zap, 0); + assert (rc == 5 && memcmp (zmq_msg_data (&mechanism), "PLAIN", 5) == 0); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 1); + + // Credentials + rc = zmq_msg_init (&credentials); + assert (rc == 0); + rc = zmq_msg_recv (&credentials, zap, 0); + optlen = sizeof more; + rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen); + assert (rc == 0 && more == 0); + + const bool auth_ok = + authenticate ((unsigned char *) zmq_msg_data (&credentials), + zmq_msg_size (&credentials)); + + rc = zmq_msg_send (&version, zap, ZMQ_SNDMORE); + assert (rc == 3); + + rc = zmq_msg_send (&seqno, zap, ZMQ_SNDMORE); + assert (rc != -1); + + rc = zmq_msg_init_size (&status_code, 3); + assert (rc == 0); + memcpy (zmq_msg_data (&status_code), auth_ok? "200": "400", 3); + rc = zmq_msg_send (&status_code, zap, ZMQ_SNDMORE); + assert (rc == 3); + + rc = zmq_msg_init (&status_text); + assert (rc == 0); + rc = zmq_msg_send (&status_text, zap, ZMQ_SNDMORE); + assert (rc == 0); + + rc = zmq_msg_init (&user_id); + assert (rc == 0); + rc = zmq_msg_send (&user_id, zap, 0); + assert (rc == 0); + + rc = zmq_msg_close (&domain); + assert (rc == 0); + + rc = zmq_msg_close (&mechanism); + assert (rc == 0); + + rc = zmq_msg_close (&credentials); + assert (rc == 0); + + rc = zmq_close (zap); + assert (rc == 0); + + return NULL; +} + int main (void) { void *ctx = zmq_ctx_new (); @@ -122,6 +238,18 @@ int main (void) assert (rc == 0); assert (as_server == 1); + // Create and bind ZAP socket + void *zap = zmq_socket (ctx, ZMQ_REP); + assert (zap); + + rc = zmq_bind (zap, "inproc://zeromq.zap.01"); + assert (rc == 0); + + // Spawn ZAP handler + pthread_t zap_thread; + rc = pthread_create (&zap_thread, NULL, &zap_handler, zap); + assert (rc == 0); + rc = zmq_bind (server, "tcp://*:9998"); assert (rc == 0); rc = zmq_connect (client, "tcp://localhost:9998"); @@ -133,6 +261,9 @@ int main (void) assert (rc == 0); rc = zmq_close (server); assert (rc == 0); + + // Wait until ZAP handler terminates. + pthread_join (zap_thread, NULL); // Check PLAIN security -- two servers trying to talk to each other server = zmq_socket (ctx, ZMQ_DEALER);