Changed message structure for event notifications.

This commit is contained in:
Guido Goldstein
2013-03-08 13:48:18 +01:00
parent 21eeb03b6c
commit b0b8ab27c5
5 changed files with 70 additions and 96 deletions

View File

@@ -1091,10 +1091,8 @@ void zmq::socket_base_t::event_connected (std::string &addr_, int fd_)
if (monitor_events & ZMQ_EVENT_CONNECTED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CONNECTED;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value = fd_;
monitor_event (event);
monitor_event (event, addr_);
}
}
@@ -1103,10 +1101,8 @@ void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_)
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CONNECT_DELAYED;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value = err_;
monitor_event (event);
monitor_event (event, addr_);
}
}
@@ -1115,10 +1111,8 @@ void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CONNECT_RETRIED;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value = interval_;
monitor_event (event);
monitor_event (event, addr_);
}
}
@@ -1127,10 +1121,8 @@ void zmq::socket_base_t::event_listening (std::string &addr_, int fd_)
if (monitor_events & ZMQ_EVENT_LISTENING) {
zmq_event_t event;
event.event = ZMQ_EVENT_LISTENING;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value = fd_;
monitor_event (event);
monitor_event (event, addr_);
}
}
@@ -1139,10 +1131,8 @@ void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_)
if (monitor_events & ZMQ_EVENT_BIND_FAILED) {
zmq_event_t event;
event.event = ZMQ_EVENT_BIND_FAILED;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value = err_;
monitor_event (event);
monitor_event (event, addr_);
}
}
@@ -1151,10 +1141,8 @@ void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_)
if (monitor_events & ZMQ_EVENT_ACCEPTED) {
zmq_event_t event;
event.event = ZMQ_EVENT_ACCEPTED;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value = fd_;
monitor_event (event);
monitor_event (event, addr_);
}
}
@@ -1163,10 +1151,8 @@ void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_)
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) {
zmq_event_t event;
event.event = ZMQ_EVENT_ACCEPT_FAILED;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value= err_;
monitor_event (event);
monitor_event (event, addr_);
}
}
@@ -1175,10 +1161,8 @@ void zmq::socket_base_t::event_closed (std::string &addr_, int fd_)
if (monitor_events & ZMQ_EVENT_CLOSED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CLOSED;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value = fd_;
monitor_event (event);
monitor_event (event, addr_);
}
}
@@ -1187,10 +1171,8 @@ void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_)
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CLOSE_FAILED;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value = err_;
monitor_event (event);
monitor_event (event, addr_);
}
}
@@ -1199,30 +1181,29 @@ void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_)
if (monitor_events & ZMQ_EVENT_DISCONNECTED) {
zmq_event_t event;
event.event = ZMQ_EVENT_DISCONNECTED;
event.addr = (char *) malloc (addr_.size () + 1);
copy_monitor_address (event.addr, addr_);
event.value = fd_;
monitor_event (event);
monitor_event (event, addr_);
}
}
void zmq::socket_base_t::copy_monitor_address (char *dest_, std::string &src_)
{
alloc_assert (dest_);
dest_[src_.size ()] = 0;
memcpy (dest_, src_.c_str (), src_.size ());
}
void zmq::socket_base_t::monitor_event (zmq_event_t event_)
void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_)
{
if (monitor_socket) {
const uint16_t eid = (uint16_t)event_.event ;
const uint32_t value = (uint32_t)event_.value ;
// prepare and send first message frame
// containing event id and value
zmq_msg_t msg;
void *event_data = malloc (sizeof (event_));
alloc_assert (event_data);
memcpy (event_data, &event_, sizeof (event_));
zmq_msg_init_data (&msg, event_data, sizeof (event_), zmq_free_event, NULL);
zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value));
char* data1 = (char*)zmq_msg_data(&msg);
memcpy (data1, &eid, sizeof(eid));
memcpy (data1+sizeof(eid), &value, sizeof(value));
zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
// prepare and send second message frame
// containing the address (endpoint)
zmq_msg_init_size (&msg, addr_.size());
memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size());
zmq_sendmsg (monitor_socket, &msg, 0);
zmq_msg_close (&msg);
}
}