Fix invalid address metadata for ZMQ_EVENT_DISCONNECTED

This commit is contained in:
Lourens Naudé
2012-08-04 11:41:33 +01:00
parent 1f22954762
commit 9dc248f6ab
9 changed files with 13 additions and 22 deletions

View File

@@ -111,9 +111,8 @@ void zmq::ipc_connecter_t::out_event ()
add_reconnect_timer(); add_reconnect_timer();
return; return;
} }
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine); alloc_assert (engine);
// Attach the engine to the corresponding session object. // Attach the engine to the corresponding session object.

View File

@@ -81,7 +81,7 @@ void zmq::ipc_listener_t::in_event ()
} }
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine); alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already // Choose I/O thread to run connecter in. Given that we are already
@@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (rc != 0) if (rc != 0)
goto error; goto error;
socket->monitor_event (ZMQ_EVENT_LISTENING, addr_, s); socket->monitor_event (ZMQ_EVENT_LISTENING, endpoint.c_str(), s);
return 0; return 0;
error: error:

View File

@@ -279,13 +279,6 @@ void zmq::session_base_t::hiccuped (pipe_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
int zmq::session_base_t::get_address (std::string &addr_)
{
if (addr)
return addr->to_string (addr_);
return -1;
}
void zmq::session_base_t::monitor_event (int event_, ...) void zmq::session_base_t::monitor_event (int event_, ...)
{ {
va_list args; va_list args;

View File

@@ -66,7 +66,6 @@ namespace zmq
void hiccuped (zmq::pipe_t *pipe_); void hiccuped (zmq::pipe_t *pipe_);
void terminated (zmq::pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_);
int get_address (std::string &addr_);
void monitor_event (int event_, ...); void monitor_event (int event_, ...);
protected: protected:

View File

@@ -42,7 +42,7 @@
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) :
s (fd_), s (fd_),
inpos (NULL), inpos (NULL),
insize (0), insize (0),
@@ -53,11 +53,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
encoder (out_batch_size), encoder (out_batch_size),
session (NULL), session (NULL),
options (options_), options (options_),
plugged (false) plugged (false),
endpoint (endpoint_)
{ {
// Put the socket into non-blocking mode. // Put the socket into non-blocking mode.
unblock_socket (s); unblock_socket (s);
// Set the socket buffer limits for the underlying socket. // Set the socket buffer limits for the underlying socket.
if (options.sndbuf) { if (options.sndbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
@@ -116,14 +116,11 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
decoder.set_session (session_); decoder.set_session (session_);
session = session_; session = session_;
session->get_address (endpoint);
// Connect to I/O threads poller object. // Connect to I/O threads poller object.
io_object_t::plug (io_thread_); io_object_t::plug (io_thread_);
handle = add_fd (s); handle = add_fd (s);
set_pollin (handle); set_pollin (handle);
set_pollout (handle); set_pollout (handle);
// Flush all the data that may have been already received downstream. // Flush all the data that may have been already received downstream.
in_event (); in_event ();
} }
@@ -143,7 +140,6 @@ void zmq::stream_engine_t::unplug ()
encoder.set_session (NULL); encoder.set_session (NULL);
decoder.set_session (NULL); decoder.set_session (NULL);
session = NULL; session = NULL;
endpoint.clear();
} }
void zmq::stream_engine_t::terminate () void zmq::stream_engine_t::terminate ()

View File

@@ -45,7 +45,7 @@ namespace zmq
{ {
public: public:
stream_engine_t (fd_t fd_, const options_t &options_); stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint);
~stream_engine_t (); ~stream_engine_t ();
// i_engine interface implementation. // i_engine interface implementation.

View File

@@ -126,7 +126,7 @@ void zmq::tcp_connecter_t::out_event ()
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine); alloc_assert (engine);
// Attach the engine to the corresponding session object. // Attach the engine to the corresponding session object.

View File

@@ -93,7 +93,7 @@ void zmq::tcp_listener_t::in_event ()
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine); alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already // Choose I/O thread to run connecter in. Given that we are already

View File

@@ -21,6 +21,7 @@
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
#include "testutil.hpp"
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h" #include "../include/zmq_utils.h"
@@ -98,6 +99,8 @@ int main (int argc, char *argv [])
rc = zmq_connect (req, "tcp://127.0.0.1:5560"); rc = zmq_connect (req, "tcp://127.0.0.1:5560");
assert (rc == 0); assert (rc == 0);
bounce (rep, req);
// Allow a window for socket events as connect can be async // Allow a window for socket events as connect can be async
zmq_sleep (1); zmq_sleep (1);
@@ -120,6 +123,7 @@ int main (int argc, char *argv [])
assert (events & ZMQ_EVENT_ACCEPTED); assert (events & ZMQ_EVENT_ACCEPTED);
assert (events & ZMQ_EVENT_CONNECTED); assert (events & ZMQ_EVENT_CONNECTED);
assert (events & ZMQ_EVENT_CLOSED); assert (events & ZMQ_EVENT_CLOSED);
assert (events & ZMQ_EVENT_DISCONNECTED);
return 0 ; return 0 ;
} }