From 9dc248f6abeb7461b9e85bb5d0106ad86bab089f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Sat, 4 Aug 2012 11:41:33 +0100 Subject: [PATCH] Fix invalid address metadata for ZMQ_EVENT_DISCONNECTED --- src/ipc_connecter.cpp | 3 +-- src/ipc_listener.cpp | 4 ++-- src/session_base.cpp | 7 ------- src/session_base.hpp | 1 - src/stream_engine.cpp | 10 +++------- src/stream_engine.hpp | 2 +- src/tcp_connecter.cpp | 2 +- src/tcp_listener.cpp | 2 +- tests/test_monitor.cpp | 4 ++++ 9 files changed, 13 insertions(+), 22 deletions(-) diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 09a93fee..a6b47d33 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -111,9 +111,8 @@ void zmq::ipc_connecter_t::out_event () add_reconnect_timer(); return; } - // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); + stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index d21f9dde..13e8af9c 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -81,7 +81,7 @@ void zmq::ipc_listener_t::in_event () } // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); + stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already @@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_) if (rc != 0) goto error; - socket->monitor_event (ZMQ_EVENT_LISTENING, addr_, s); + socket->monitor_event (ZMQ_EVENT_LISTENING, endpoint.c_str(), s); return 0; error: diff --git a/src/session_base.cpp b/src/session_base.cpp index d4e9855c..26d81c49 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -279,13 +279,6 @@ void zmq::session_base_t::hiccuped (pipe_t *pipe_) zmq_assert (false); } -int zmq::session_base_t::get_address (std::string &addr_) -{ - if (addr) - return addr->to_string (addr_); - return -1; -} - void zmq::session_base_t::monitor_event (int event_, ...) { va_list args; diff --git a/src/session_base.hpp b/src/session_base.hpp index fa765ba3..db11c884 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -66,7 +66,6 @@ namespace zmq void hiccuped (zmq::pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_); - int get_address (std::string &addr_); void monitor_event (int event_, ...); protected: diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index d9b91cf7..07489a7d 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -42,7 +42,7 @@ #include "err.hpp" #include "ip.hpp" -zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : +zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) : s (fd_), inpos (NULL), insize (0), @@ -53,11 +53,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : encoder (out_batch_size), session (NULL), options (options_), - plugged (false) + plugged (false), + endpoint (endpoint_) { // Put the socket into non-blocking mode. unblock_socket (s); - // Set the socket buffer limits for the underlying socket. if (options.sndbuf) { int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, @@ -116,14 +116,11 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, decoder.set_session (session_); session = session_; - session->get_address (endpoint); - // Connect to I/O threads poller object. io_object_t::plug (io_thread_); handle = add_fd (s); set_pollin (handle); set_pollout (handle); - // Flush all the data that may have been already received downstream. in_event (); } @@ -143,7 +140,6 @@ void zmq::stream_engine_t::unplug () encoder.set_session (NULL); decoder.set_session (NULL); session = NULL; - endpoint.clear(); } void zmq::stream_engine_t::terminate () diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 02d00d24..f4d15b6d 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -45,7 +45,7 @@ namespace zmq { public: - stream_engine_t (fd_t fd_, const options_t &options_); + stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint); ~stream_engine_t (); // i_engine interface implementation. diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 9c75f288..e3aac63e 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -126,7 +126,7 @@ void zmq::tcp_connecter_t::out_event () tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); + stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 1c4646c0..7eb5ea96 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -93,7 +93,7 @@ void zmq::tcp_listener_t::in_event () tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); + stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 4c6cc1c7..a4cc02f1 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -21,6 +21,7 @@ #include #include +#include "testutil.hpp" #include "../include/zmq.h" #include "../include/zmq_utils.h" @@ -98,6 +99,8 @@ int main (int argc, char *argv []) rc = zmq_connect (req, "tcp://127.0.0.1:5560"); assert (rc == 0); + bounce (rep, req); + // Allow a window for socket events as connect can be async zmq_sleep (1); @@ -120,6 +123,7 @@ int main (int argc, char *argv []) assert (events & ZMQ_EVENT_ACCEPTED); assert (events & ZMQ_EVENT_CONNECTED); assert (events & ZMQ_EVENT_CLOSED); + assert (events & ZMQ_EVENT_DISCONNECTED); return 0 ; }