mirror of
https://github.com/zeromq/libzmq.git
synced 2025-02-24 07:21:32 +01:00
Merge pull request #528 from guidog/master
Changed message structure for event notifications
This commit is contained in:
commit
ba2dda407d
@ -30,6 +30,7 @@ extern "C" {
|
|||||||
#if !defined _WIN32_WCE
|
#if !defined _WIN32_WCE
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#endif
|
#endif
|
||||||
|
#include <stdint.h>
|
||||||
#include <stddef.h>
|
#include <stddef.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#if defined _WIN32
|
#if defined _WIN32
|
||||||
@ -296,9 +297,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
|
|||||||
|
|
||||||
/* Socket event data */
|
/* Socket event data */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
unsigned int event; // id of the event as bitfield
|
uint16_t event; // id of the event as bitfield
|
||||||
char *addr; // endpoint affected as c string
|
int32_t value ; // value is either error code, fd or reconnect interval
|
||||||
int value ; // value is either error code, fd or reconnect interval
|
|
||||||
} zmq_event_t;
|
} zmq_event_t;
|
||||||
|
|
||||||
ZMQ_EXPORT void *zmq_socket (void *, int type);
|
ZMQ_EXPORT void *zmq_socket (void *, int type);
|
||||||
|
@ -1091,10 +1091,8 @@ void zmq::socket_base_t::event_connected (std::string &addr_, int fd_)
|
|||||||
if (monitor_events & ZMQ_EVENT_CONNECTED) {
|
if (monitor_events & ZMQ_EVENT_CONNECTED) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_CONNECTED;
|
event.event = ZMQ_EVENT_CONNECTED;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value = fd_;
|
event.value = fd_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1103,10 +1101,8 @@ void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_)
|
|||||||
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) {
|
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_CONNECT_DELAYED;
|
event.event = ZMQ_EVENT_CONNECT_DELAYED;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value = err_;
|
event.value = err_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1115,10 +1111,8 @@ void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval
|
|||||||
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) {
|
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_CONNECT_RETRIED;
|
event.event = ZMQ_EVENT_CONNECT_RETRIED;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value = interval_;
|
event.value = interval_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1127,10 +1121,8 @@ void zmq::socket_base_t::event_listening (std::string &addr_, int fd_)
|
|||||||
if (monitor_events & ZMQ_EVENT_LISTENING) {
|
if (monitor_events & ZMQ_EVENT_LISTENING) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_LISTENING;
|
event.event = ZMQ_EVENT_LISTENING;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value = fd_;
|
event.value = fd_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1139,10 +1131,8 @@ void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_)
|
|||||||
if (monitor_events & ZMQ_EVENT_BIND_FAILED) {
|
if (monitor_events & ZMQ_EVENT_BIND_FAILED) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_BIND_FAILED;
|
event.event = ZMQ_EVENT_BIND_FAILED;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value = err_;
|
event.value = err_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1151,10 +1141,8 @@ void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_)
|
|||||||
if (monitor_events & ZMQ_EVENT_ACCEPTED) {
|
if (monitor_events & ZMQ_EVENT_ACCEPTED) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_ACCEPTED;
|
event.event = ZMQ_EVENT_ACCEPTED;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value = fd_;
|
event.value = fd_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1163,10 +1151,8 @@ void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_)
|
|||||||
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) {
|
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_ACCEPT_FAILED;
|
event.event = ZMQ_EVENT_ACCEPT_FAILED;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value= err_;
|
event.value= err_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1175,10 +1161,8 @@ void zmq::socket_base_t::event_closed (std::string &addr_, int fd_)
|
|||||||
if (monitor_events & ZMQ_EVENT_CLOSED) {
|
if (monitor_events & ZMQ_EVENT_CLOSED) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_CLOSED;
|
event.event = ZMQ_EVENT_CLOSED;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value = fd_;
|
event.value = fd_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1187,10 +1171,8 @@ void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_)
|
|||||||
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) {
|
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_CLOSE_FAILED;
|
event.event = ZMQ_EVENT_CLOSE_FAILED;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value = err_;
|
event.value = err_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1199,30 +1181,29 @@ void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_)
|
|||||||
if (monitor_events & ZMQ_EVENT_DISCONNECTED) {
|
if (monitor_events & ZMQ_EVENT_DISCONNECTED) {
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
event.event = ZMQ_EVENT_DISCONNECTED;
|
event.event = ZMQ_EVENT_DISCONNECTED;
|
||||||
event.addr = (char *) malloc (addr_.size () + 1);
|
|
||||||
copy_monitor_address (event.addr, addr_);
|
|
||||||
event.value = fd_;
|
event.value = fd_;
|
||||||
monitor_event (event);
|
monitor_event (event, addr_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::copy_monitor_address (char *dest_, std::string &src_)
|
void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_)
|
||||||
{
|
|
||||||
alloc_assert (dest_);
|
|
||||||
dest_[src_.size ()] = 0;
|
|
||||||
memcpy (dest_, src_.c_str (), src_.size ());
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::socket_base_t::monitor_event (zmq_event_t event_)
|
|
||||||
{
|
{
|
||||||
if (monitor_socket) {
|
if (monitor_socket) {
|
||||||
|
const uint16_t eid = (uint16_t)event_.event ;
|
||||||
|
const uint32_t value = (uint32_t)event_.value ;
|
||||||
|
// prepare and send first message frame
|
||||||
|
// containing event id and value
|
||||||
zmq_msg_t msg;
|
zmq_msg_t msg;
|
||||||
void *event_data = malloc (sizeof (event_));
|
zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value));
|
||||||
alloc_assert (event_data);
|
char* data1 = (char*)zmq_msg_data(&msg);
|
||||||
memcpy (event_data, &event_, sizeof (event_));
|
memcpy (data1, &eid, sizeof(eid));
|
||||||
zmq_msg_init_data (&msg, event_data, sizeof (event_), zmq_free_event, NULL);
|
memcpy (data1+sizeof(eid), &value, sizeof(value));
|
||||||
|
zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
|
||||||
|
// prepare and send second message frame
|
||||||
|
// containing the address (endpoint)
|
||||||
|
zmq_msg_init_size (&msg, addr_.size());
|
||||||
|
memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size());
|
||||||
zmq_sendmsg (monitor_socket, &msg, 0);
|
zmq_sendmsg (monitor_socket, &msg, 0);
|
||||||
zmq_msg_close (&msg);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,10 +154,7 @@ namespace zmq
|
|||||||
void process_destroy ();
|
void process_destroy ();
|
||||||
|
|
||||||
// Socket event data dispath
|
// Socket event data dispath
|
||||||
void monitor_event (zmq_event_t data_);
|
void monitor_event (zmq_event_t data_, const std::string& addr_);
|
||||||
|
|
||||||
// Copy monitor specific event endpoints to event messages
|
|
||||||
void copy_monitor_address (char *dest_, std::string &src_);
|
|
||||||
|
|
||||||
// Monitor socket cleanup
|
// Monitor socket cleanup
|
||||||
void stop_monitor ();
|
void stop_monitor ();
|
||||||
|
10
src/zmq.cpp
10
src/zmq.cpp
@ -999,13 +999,3 @@ int zmq_device (int /* type */, void *frontend_, void *backend_)
|
|||||||
(zmq::socket_base_t*) frontend_,
|
(zmq::socket_base_t*) frontend_,
|
||||||
(zmq::socket_base_t*) backend_, NULL);
|
(zmq::socket_base_t*) backend_, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Callback to free socket event data
|
|
||||||
|
|
||||||
void zmq_free_event (void *event_data, void * /* hint */)
|
|
||||||
{
|
|
||||||
const zmq_event_t *event = (zmq_event_t *) event_data;
|
|
||||||
|
|
||||||
free (event->addr);
|
|
||||||
free (event_data);
|
|
||||||
}
|
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <string>
|
||||||
#include "../include/zmq.h"
|
#include "../include/zmq.h"
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
@ -31,12 +32,40 @@ static int req2_socket_events;
|
|||||||
// REP socket events handled
|
// REP socket events handled
|
||||||
static int rep_socket_events;
|
static int rep_socket_events;
|
||||||
|
|
||||||
const char *addr;
|
std::string addr ;
|
||||||
|
|
||||||
|
static bool read_msg(void* s, zmq_event_t& event, std::string& ep)
|
||||||
|
{
|
||||||
|
int rc ;
|
||||||
|
zmq_msg_t msg1; // binary part
|
||||||
|
zmq_msg_init (&msg1);
|
||||||
|
zmq_msg_t msg2; // address part
|
||||||
|
zmq_msg_init (&msg2);
|
||||||
|
rc = zmq_msg_recv (&msg1, s, 0);
|
||||||
|
if (rc == -1 && zmq_errno() == ETERM)
|
||||||
|
return true ;
|
||||||
|
assert (rc != -1);
|
||||||
|
assert (zmq_msg_more(&msg1) != 0);
|
||||||
|
rc = zmq_msg_recv (&msg2, s, 0);
|
||||||
|
if (rc == -1 && zmq_errno() == ETERM)
|
||||||
|
return true;
|
||||||
|
assert (rc != -1);
|
||||||
|
assert (zmq_msg_more(&msg2) == 0);
|
||||||
|
// copy binary data to event struct
|
||||||
|
const char* data = (char*)zmq_msg_data(&msg1);
|
||||||
|
memcpy(&event.event, data, sizeof(event.event));
|
||||||
|
memcpy(&event.value, data+sizeof(event.event), sizeof(event.value));
|
||||||
|
// copy address part
|
||||||
|
ep = std::string((char*)zmq_msg_data(&msg2), zmq_msg_size(&msg2));
|
||||||
|
return false ;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// REQ socket monitor thread
|
// REQ socket monitor thread
|
||||||
static void *req_socket_monitor (void *ctx)
|
static void *req_socket_monitor (void *ctx)
|
||||||
{
|
{
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
|
std::string ep ;
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
void *s = zmq_socket (ctx, ZMQ_PAIR);
|
void *s = zmq_socket (ctx, ZMQ_PAIR);
|
||||||
@ -44,16 +73,8 @@ static void *req_socket_monitor (void *ctx)
|
|||||||
|
|
||||||
rc = zmq_connect (s, "inproc://monitor.req");
|
rc = zmq_connect (s, "inproc://monitor.req");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
while (true) {
|
while (!read_msg(s, event, ep)) {
|
||||||
zmq_msg_t msg;
|
assert (ep == addr);
|
||||||
zmq_msg_init (&msg);
|
|
||||||
rc = zmq_msg_recv (&msg, s, 0);
|
|
||||||
if (rc == -1 && zmq_errno() == ETERM)
|
|
||||||
break;
|
|
||||||
assert (rc != -1);
|
|
||||||
|
|
||||||
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
|
|
||||||
assert (!strcmp (event.addr, addr));
|
|
||||||
switch (event.event) {
|
switch (event.event) {
|
||||||
case ZMQ_EVENT_CONNECTED:
|
case ZMQ_EVENT_CONNECTED:
|
||||||
assert (event.value > 0);
|
assert (event.value > 0);
|
||||||
@ -86,6 +107,7 @@ static void *req_socket_monitor (void *ctx)
|
|||||||
static void *req2_socket_monitor (void *ctx)
|
static void *req2_socket_monitor (void *ctx)
|
||||||
{
|
{
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
|
std::string ep ;
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
void *s = zmq_socket (ctx, ZMQ_PAIR);
|
void *s = zmq_socket (ctx, ZMQ_PAIR);
|
||||||
@ -93,16 +115,8 @@ static void *req2_socket_monitor (void *ctx)
|
|||||||
|
|
||||||
rc = zmq_connect (s, "inproc://monitor.req2");
|
rc = zmq_connect (s, "inproc://monitor.req2");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
while (true) {
|
while (!read_msg(s, event, ep)) {
|
||||||
zmq_msg_t msg;
|
assert (ep == addr);
|
||||||
zmq_msg_init (&msg);
|
|
||||||
rc = zmq_msg_recv (&msg, s, 0);
|
|
||||||
if (rc == -1 && zmq_errno() == ETERM)
|
|
||||||
break;
|
|
||||||
assert (rc != -1);
|
|
||||||
|
|
||||||
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
|
|
||||||
assert (!strcmp (event.addr, addr));
|
|
||||||
switch (event.event) {
|
switch (event.event) {
|
||||||
case ZMQ_EVENT_CONNECTED:
|
case ZMQ_EVENT_CONNECTED:
|
||||||
assert (event.value > 0);
|
assert (event.value > 0);
|
||||||
@ -122,6 +136,7 @@ static void *req2_socket_monitor (void *ctx)
|
|||||||
static void *rep_socket_monitor (void *ctx)
|
static void *rep_socket_monitor (void *ctx)
|
||||||
{
|
{
|
||||||
zmq_event_t event;
|
zmq_event_t event;
|
||||||
|
std::string ep ;
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
void *s = zmq_socket (ctx, ZMQ_PAIR);
|
void *s = zmq_socket (ctx, ZMQ_PAIR);
|
||||||
@ -129,16 +144,8 @@ static void *rep_socket_monitor (void *ctx)
|
|||||||
|
|
||||||
rc = zmq_connect (s, "inproc://monitor.rep");
|
rc = zmq_connect (s, "inproc://monitor.rep");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
while (true) {
|
while (!read_msg(s, event, ep)) {
|
||||||
zmq_msg_t msg;
|
assert (ep == addr);
|
||||||
zmq_msg_init (&msg);
|
|
||||||
rc = zmq_msg_recv (&msg, s, 0);
|
|
||||||
if (rc == -1 && zmq_errno() == ETERM)
|
|
||||||
break;
|
|
||||||
assert (rc != -1);
|
|
||||||
|
|
||||||
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
|
|
||||||
assert (!strcmp (event.addr, addr));
|
|
||||||
switch (event.event) {
|
switch (event.event) {
|
||||||
case ZMQ_EVENT_LISTENING:
|
case ZMQ_EVENT_LISTENING:
|
||||||
assert (event.value > 0);
|
assert (event.value > 0);
|
||||||
@ -161,7 +168,6 @@ static void *rep_socket_monitor (void *ctx)
|
|||||||
rep_socket_events |= ZMQ_EVENT_DISCONNECTED;
|
rep_socket_events |= ZMQ_EVENT_DISCONNECTED;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
zmq_msg_close (&msg);
|
|
||||||
}
|
}
|
||||||
zmq_close (s);
|
zmq_close (s);
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -186,7 +192,7 @@ int main (void)
|
|||||||
assert (rep);
|
assert (rep);
|
||||||
|
|
||||||
// Assert supported protocols
|
// Assert supported protocols
|
||||||
rc = zmq_socket_monitor (rep, addr, 0);
|
rc = zmq_socket_monitor (rep, addr.c_str(), 0);
|
||||||
assert (rc == -1);
|
assert (rc == -1);
|
||||||
assert (zmq_errno() == EPROTONOSUPPORT);
|
assert (zmq_errno() == EPROTONOSUPPORT);
|
||||||
|
|
||||||
@ -200,7 +206,7 @@ int main (void)
|
|||||||
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx);
|
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_bind (rep, addr);
|
rc = zmq_bind (rep, addr.c_str());
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// REQ socket
|
// REQ socket
|
||||||
@ -213,7 +219,7 @@ int main (void)
|
|||||||
rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx);
|
rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_connect (req, addr);
|
rc = zmq_connect (req, addr.c_str());
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
bounce (rep, req);
|
bounce (rep, req);
|
||||||
@ -228,7 +234,7 @@ int main (void)
|
|||||||
rc = pthread_create (&threads [2], NULL, req2_socket_monitor, ctx);
|
rc = pthread_create (&threads [2], NULL, req2_socket_monitor, ctx);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_connect (req2, addr);
|
rc = zmq_connect (req2, addr.c_str());
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Close the REP socket
|
// Close the REP socket
|
||||||
|
Loading…
x
Reference in New Issue
Block a user