diff --git a/include/zmq.h b/include/zmq.h index 121eef4d..59501c7c 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -30,6 +30,7 @@ extern "C" { #if !defined _WIN32_WCE #include #endif +#include #include #include #if defined _WIN32 @@ -296,9 +297,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); /* Socket event data */ typedef struct { - unsigned int event; // id of the event as bitfield - char *addr; // endpoint affected as c string - int value ; // value is either error code, fd or reconnect interval + uint16_t event; // id of the event as bitfield + int32_t value ; // value is either error code, fd or reconnect interval } zmq_event_t; ZMQ_EXPORT void *zmq_socket (void *, int type); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 8f044be1..8092b088 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1091,10 +1091,8 @@ void zmq::socket_base_t::event_connected (std::string &addr_, int fd_) if (monitor_events & ZMQ_EVENT_CONNECTED) { zmq_event_t event; event.event = ZMQ_EVENT_CONNECTED; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value = fd_; - monitor_event (event); + monitor_event (event, addr_); } } @@ -1103,10 +1101,8 @@ void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_) if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) { zmq_event_t event; event.event = ZMQ_EVENT_CONNECT_DELAYED; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value = err_; - monitor_event (event); + monitor_event (event, addr_); } } @@ -1115,10 +1111,8 @@ void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) { zmq_event_t event; event.event = ZMQ_EVENT_CONNECT_RETRIED; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value = interval_; - monitor_event (event); + monitor_event (event, addr_); } } @@ -1127,10 +1121,8 @@ void zmq::socket_base_t::event_listening (std::string &addr_, int fd_) if (monitor_events & ZMQ_EVENT_LISTENING) { zmq_event_t event; event.event = ZMQ_EVENT_LISTENING; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value = fd_; - monitor_event (event); + monitor_event (event, addr_); } } @@ -1139,10 +1131,8 @@ void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_) if (monitor_events & ZMQ_EVENT_BIND_FAILED) { zmq_event_t event; event.event = ZMQ_EVENT_BIND_FAILED; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value = err_; - monitor_event (event); + monitor_event (event, addr_); } } @@ -1151,10 +1141,8 @@ void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_) if (monitor_events & ZMQ_EVENT_ACCEPTED) { zmq_event_t event; event.event = ZMQ_EVENT_ACCEPTED; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value = fd_; - monitor_event (event); + monitor_event (event, addr_); } } @@ -1163,10 +1151,8 @@ void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_) if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) { zmq_event_t event; event.event = ZMQ_EVENT_ACCEPT_FAILED; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value= err_; - monitor_event (event); + monitor_event (event, addr_); } } @@ -1175,10 +1161,8 @@ void zmq::socket_base_t::event_closed (std::string &addr_, int fd_) if (monitor_events & ZMQ_EVENT_CLOSED) { zmq_event_t event; event.event = ZMQ_EVENT_CLOSED; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value = fd_; - monitor_event (event); + monitor_event (event, addr_); } } @@ -1187,10 +1171,8 @@ void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_) if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) { zmq_event_t event; event.event = ZMQ_EVENT_CLOSE_FAILED; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value = err_; - monitor_event (event); + monitor_event (event, addr_); } } @@ -1199,30 +1181,29 @@ void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_) if (monitor_events & ZMQ_EVENT_DISCONNECTED) { zmq_event_t event; event.event = ZMQ_EVENT_DISCONNECTED; - event.addr = (char *) malloc (addr_.size () + 1); - copy_monitor_address (event.addr, addr_); event.value = fd_; - monitor_event (event); + monitor_event (event, addr_); } } -void zmq::socket_base_t::copy_monitor_address (char *dest_, std::string &src_) -{ - alloc_assert (dest_); - dest_[src_.size ()] = 0; - memcpy (dest_, src_.c_str (), src_.size ()); -} - -void zmq::socket_base_t::monitor_event (zmq_event_t event_) +void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_) { if (monitor_socket) { + const uint16_t eid = (uint16_t)event_.event ; + const uint32_t value = (uint32_t)event_.value ; + // prepare and send first message frame + // containing event id and value zmq_msg_t msg; - void *event_data = malloc (sizeof (event_)); - alloc_assert (event_data); - memcpy (event_data, &event_, sizeof (event_)); - zmq_msg_init_data (&msg, event_data, sizeof (event_), zmq_free_event, NULL); + zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value)); + char* data1 = (char*)zmq_msg_data(&msg); + memcpy (data1, &eid, sizeof(eid)); + memcpy (data1+sizeof(eid), &value, sizeof(value)); + zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE); + // prepare and send second message frame + // containing the address (endpoint) + zmq_msg_init_size (&msg, addr_.size()); + memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size()); zmq_sendmsg (monitor_socket, &msg, 0); - zmq_msg_close (&msg); } } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 5192bd70..4e0f9f92 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -154,10 +154,7 @@ namespace zmq void process_destroy (); // Socket event data dispath - void monitor_event (zmq_event_t data_); - - // Copy monitor specific event endpoints to event messages - void copy_monitor_address (char *dest_, std::string &src_); + void monitor_event (zmq_event_t data_, const std::string& addr_); // Monitor socket cleanup void stop_monitor (); diff --git a/src/zmq.cpp b/src/zmq.cpp index 5f2264b8..b331abcb 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -999,13 +999,3 @@ int zmq_device (int /* type */, void *frontend_, void *backend_) (zmq::socket_base_t*) frontend_, (zmq::socket_base_t*) backend_, NULL); } - -// Callback to free socket event data - -void zmq_free_event (void *event_data, void * /* hint */) -{ - const zmq_event_t *event = (zmq_event_t *) event_data; - - free (event->addr); - free (event_data); -} diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 3d81f46d..2a79fea3 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -19,6 +19,7 @@ along with this program. If not, see . */ +#include #include "../include/zmq.h" #include #include @@ -31,12 +32,40 @@ static int req2_socket_events; // REP socket events handled static int rep_socket_events; -const char *addr; +std::string addr ; + +static bool read_msg(void* s, zmq_event_t& event, std::string& ep) +{ + int rc ; + zmq_msg_t msg1; // binary part + zmq_msg_init (&msg1); + zmq_msg_t msg2; // address part + zmq_msg_init (&msg2); + rc = zmq_msg_recv (&msg1, s, 0); + if (rc == -1 && zmq_errno() == ETERM) + return true ; + assert (rc != -1); + assert (zmq_msg_more(&msg1) != 0); + rc = zmq_msg_recv (&msg2, s, 0); + if (rc == -1 && zmq_errno() == ETERM) + return true; + assert (rc != -1); + assert (zmq_msg_more(&msg2) == 0); + // copy binary data to event struct + const char* data = (char*)zmq_msg_data(&msg1); + memcpy(&event.event, data, sizeof(event.event)); + memcpy(&event.value, data+sizeof(event.event), sizeof(event.value)); + // copy address part + ep = std::string((char*)zmq_msg_data(&msg2), zmq_msg_size(&msg2)); + return false ; +} + // REQ socket monitor thread static void *req_socket_monitor (void *ctx) { zmq_event_t event; + std::string ep ; int rc; void *s = zmq_socket (ctx, ZMQ_PAIR); @@ -44,16 +73,8 @@ static void *req_socket_monitor (void *ctx) rc = zmq_connect (s, "inproc://monitor.req"); assert (rc == 0); - while (true) { - zmq_msg_t msg; - zmq_msg_init (&msg); - rc = zmq_msg_recv (&msg, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - break; - assert (rc != -1); - - memcpy (&event, zmq_msg_data (&msg), sizeof (event)); - assert (!strcmp (event.addr, addr)); + while (!read_msg(s, event, ep)) { + assert (ep == addr); switch (event.event) { case ZMQ_EVENT_CONNECTED: assert (event.value > 0); @@ -86,6 +107,7 @@ static void *req_socket_monitor (void *ctx) static void *req2_socket_monitor (void *ctx) { zmq_event_t event; + std::string ep ; int rc; void *s = zmq_socket (ctx, ZMQ_PAIR); @@ -93,16 +115,8 @@ static void *req2_socket_monitor (void *ctx) rc = zmq_connect (s, "inproc://monitor.req2"); assert (rc == 0); - while (true) { - zmq_msg_t msg; - zmq_msg_init (&msg); - rc = zmq_msg_recv (&msg, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - break; - assert (rc != -1); - - memcpy (&event, zmq_msg_data (&msg), sizeof (event)); - assert (!strcmp (event.addr, addr)); + while (!read_msg(s, event, ep)) { + assert (ep == addr); switch (event.event) { case ZMQ_EVENT_CONNECTED: assert (event.value > 0); @@ -122,6 +136,7 @@ static void *req2_socket_monitor (void *ctx) static void *rep_socket_monitor (void *ctx) { zmq_event_t event; + std::string ep ; int rc; void *s = zmq_socket (ctx, ZMQ_PAIR); @@ -129,16 +144,8 @@ static void *rep_socket_monitor (void *ctx) rc = zmq_connect (s, "inproc://monitor.rep"); assert (rc == 0); - while (true) { - zmq_msg_t msg; - zmq_msg_init (&msg); - rc = zmq_msg_recv (&msg, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - break; - assert (rc != -1); - - memcpy (&event, zmq_msg_data (&msg), sizeof (event)); - assert (!strcmp (event.addr, addr)); + while (!read_msg(s, event, ep)) { + assert (ep == addr); switch (event.event) { case ZMQ_EVENT_LISTENING: assert (event.value > 0); @@ -161,7 +168,6 @@ static void *rep_socket_monitor (void *ctx) rep_socket_events |= ZMQ_EVENT_DISCONNECTED; break; } - zmq_msg_close (&msg); } zmq_close (s); return NULL; @@ -186,7 +192,7 @@ int main (void) assert (rep); // Assert supported protocols - rc = zmq_socket_monitor (rep, addr, 0); + rc = zmq_socket_monitor (rep, addr.c_str(), 0); assert (rc == -1); assert (zmq_errno() == EPROTONOSUPPORT); @@ -200,7 +206,7 @@ int main (void) rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx); assert (rc == 0); - rc = zmq_bind (rep, addr); + rc = zmq_bind (rep, addr.c_str()); assert (rc == 0); // REQ socket @@ -213,7 +219,7 @@ int main (void) rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx); assert (rc == 0); - rc = zmq_connect (req, addr); + rc = zmq_connect (req, addr.c_str()); assert (rc == 0); bounce (rep, req); @@ -228,7 +234,7 @@ int main (void) rc = pthread_create (&threads [2], NULL, req2_socket_monitor, ctx); assert (rc == 0); - rc = zmq_connect (req2, addr); + rc = zmq_connect (req2, addr.c_str()); assert (rc == 0); // Close the REP socket