diff --git a/include/zmq.h b/include/zmq.h index ac34b167..0b53af67 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -239,6 +239,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval); ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id); ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); +ZMQ_EXPORT int zmq_msg_set_group (zmq_msg_t *msg, const char *group); +ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg); /******************************************************************************/ @@ -359,7 +361,7 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); #define ZMQ_GSSAPI 3 /* RADIO-DISH protocol */ -#define ZMQ_GROUP_MAX_LENGTH 255 +#define ZMQ_GROUP_MAX_LENGTH 15 /* Deprecated options and aliases */ #define ZMQ_TCP_ACCEPT_FILTER 38 diff --git a/src/msg.cpp b/src/msg.cpp index 37379fcb..013d30b3 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -84,6 +84,7 @@ int zmq::msg_t::init () u.vsm.type = type_vsm; u.vsm.flags = 0; u.vsm.size = 0; + u.vsm.group[0] = '\0'; u.vsm.routing_id = 0; u.vsm.fd = retired_fd; return 0; @@ -96,6 +97,7 @@ int zmq::msg_t::init_size (size_t size_) u.vsm.type = type_vsm; u.vsm.flags = 0; u.vsm.size = (unsigned char) size_; + u.vsm.group[0] = '\0'; u.vsm.routing_id = 0; u.vsm.fd = retired_fd; } @@ -103,6 +105,7 @@ int zmq::msg_t::init_size (size_t size_) u.lmsg.metadata = NULL; u.lmsg.type = type_lmsg; u.lmsg.flags = 0; + u.lmsg.group[0] = '\0'; u.lmsg.routing_id = 0; u.lmsg.fd = retired_fd; u.lmsg.content = NULL; @@ -131,6 +134,7 @@ int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t s u.zclmsg.metadata = NULL; u.zclmsg.type = type_zclmsg; u.zclmsg.flags = 0; + u.zclmsg.group[0] = '\0'; u.zclmsg.routing_id = 0; u.zclmsg.fd = retired_fd; @@ -158,6 +162,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_, u.cmsg.flags = 0; u.cmsg.data = data_; u.cmsg.size = size_; + u.cmsg.group[0] = '\0'; u.cmsg.routing_id = 0; u.cmsg.fd = retired_fd; } @@ -165,6 +170,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_, u.lmsg.metadata = NULL; u.lmsg.type = type_lmsg; u.lmsg.flags = 0; + u.lmsg.group[0] = '\0'; u.lmsg.routing_id = 0; u.lmsg.fd = retired_fd; u.lmsg.content = (content_t*) malloc (sizeof (content_t)); @@ -188,6 +194,7 @@ int zmq::msg_t::init_delimiter () u.delimiter.metadata = NULL; u.delimiter.type = type_delimiter; u.delimiter.flags = 0; + u.delimiter.group[0] = '\0'; u.delimiter.routing_id = 0; u.delimiter.fd = retired_fd; return 0; @@ -519,6 +526,24 @@ int zmq::msg_t::reset_routing_id () return 0; } +const char * zmq::msg_t::group () +{ + return u.base.group; +} + +int zmq::msg_t::set_group (const char * group_) +{ + if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) + { + errno = EINVAL; + return -1; + } + + strcpy (u.base.group, group_); + + return 0; +} + zmq::atomic_counter_t *zmq::msg_t::refcnt() { switch(u.base.type) diff --git a/src/msg.hpp b/src/msg.hpp index 4a14ad82..3cad5ac0 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -117,6 +117,8 @@ namespace zmq uint32_t get_routing_id (); int set_routing_id (uint32_t routing_id_); int reset_routing_id (); + const char * group (); + int set_group (const char* group_); // After calling this function you can copy the message in POD-style // refs_ times. No need to call copy. @@ -131,8 +133,9 @@ namespace zmq enum { msg_t_size = 64 }; enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) + 3 + + 16 + sizeof (uint32_t) + - sizeof (fd_t)) }; + sizeof (fd_t))}; private: zmq::atomic_counter_t* refcnt(); @@ -165,10 +168,12 @@ namespace zmq metadata_t *metadata; unsigned char unused [msg_t_size - (sizeof (metadata_t *) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } base; @@ -178,6 +183,7 @@ namespace zmq unsigned char size; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } vsm; @@ -187,10 +193,12 @@ namespace zmq unsigned char unused [msg_t_size - (sizeof (metadata_t *) + sizeof (content_t*) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } lmsg; @@ -200,10 +208,12 @@ namespace zmq unsigned char unused [msg_t_size - (sizeof (metadata_t *) + sizeof (content_t*) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } zclmsg; @@ -215,10 +225,12 @@ namespace zmq sizeof (void*) + sizeof (size_t) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } cmsg; @@ -226,10 +238,12 @@ namespace zmq metadata_t *metadata; unsigned char unused [msg_t_size - (sizeof (metadata_t *) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } delimiter; diff --git a/src/radio.cpp b/src/radio.cpp index 14441d6a..ee9e0697 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -115,33 +115,10 @@ int zmq::radio_t::xsend (msg_t *msg_) return -1; } - size_t size = msg_->size (); - char *group = (char*) msg_->data(); - - // Maximum allowed group length is 255 - if (size > ZMQ_GROUP_MAX_LENGTH) - size = ZMQ_GROUP_MAX_LENGTH; - - // Check if NULL terminated - bool terminated = false; - - for (size_t index = 0; index < size; index++) { - if (group[index] == '\0') { - terminated = true; - break; - } - } - - if (!terminated) { - // User didn't include a group in the message - errno = EINVAL; - return -1; - } - dist.unmatch (); std::pair range = - subscriptions.equal_range (std::string(group)); + subscriptions.equal_range (std::string(msg_->group ())); for (subscriptions_t::iterator it = range.first; it != range.second; ++it) { dist.match (it-> second); diff --git a/src/zmq.cpp b/src/zmq.cpp index 4d508496..a041538d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -695,6 +695,16 @@ uint32_t zmq_msg_routing_id (zmq_msg_t *msg_) return ((zmq::msg_t *) msg_)->get_routing_id (); } +int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_) +{ + return ((zmq::msg_t *) msg_)->set_group (group_); +} + +const char *zmq_msg_group (zmq_msg_t *msg_) +{ + return ((zmq::msg_t *) msg_)->group (); +} + // Get message metadata string const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp index 80db94ad..43618093 100644 --- a/tests/test_radio_dish.cpp +++ b/tests/test_radio_dish.cpp @@ -29,6 +29,57 @@ #include "testutil.hpp" +int msg_send (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_) +{ + int rc = zmq_msg_init_size (msg_, strlen (body_)); + if (rc != 0) + return rc; + + memcpy (zmq_msg_data (msg_), body_, strlen (body_)); + + rc = zmq_msg_set_group (msg_, group_); + if (rc != 0) { + zmq_msg_close (msg_); + return rc; + } + + rc = zmq_msg_send (msg_, s_, 0); + + zmq_msg_close (msg_); + + return rc; +} + +int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_) +{ + int rc = zmq_msg_init (msg_); + if (rc != 0) + return -1; + + int recv_rc = zmq_msg_recv (msg_, s_, 0); + if (recv_rc == -1) + return -1; + + if (strcmp (zmq_msg_group (msg_), group_) != 0) + { + zmq_msg_close (msg_); + return -1; + } + + char * body = (char*) malloc (sizeof(char) * (zmq_msg_size (msg_) + 1)); + memcpy (body, zmq_msg_data (msg_), zmq_msg_size (msg_)); + body [zmq_msg_size (msg_)] = '\0'; + + if (strcmp (body, body_) != 0) + { + zmq_msg_close (msg_); + return -1; + } + + zmq_msg_close (msg_); + return recv_rc; +} + int main (void) { setup_test_environment (); @@ -42,7 +93,7 @@ int main (void) assert (rc == 0); // Leaving a group which we didn't join - rc = zmq_leave (dish, "World"); + rc = zmq_leave (dish, "Movies"); assert (rc == -1); // Joining too long group @@ -54,11 +105,11 @@ int main (void) assert (rc == -1); // Joining - rc = zmq_join (dish, "World"); + rc = zmq_join (dish, "Movies"); assert (rc == 0); // Duplicate Joining - rc = zmq_join (dish, "World"); + rc = zmq_join (dish, "Movies"); assert (rc == -1); // Connecting @@ -67,53 +118,51 @@ int main (void) zmq_sleep (1); - // This is not going to be sent as dish only subscribe to "World" - rc = zmq_send (radio, "Hello\0Message", 13, 0); - assert (rc == 13); + zmq_msg_t msg; + + // This is not going to be sent as dish only subscribe to "Movies" + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc == 7); // This is going to be sent to the dish - rc = zmq_send (radio, "World\0Message", 13, 0); - assert (rc == 13); + rc = msg_send (&msg, radio, "Movies", "Godfather"); + assert (rc == 9); - char* data = (char*) malloc (sizeof(char) * 13); - - rc = zmq_recv (dish, data, 13, 0); - assert (rc == 13); - assert (strcmp (data, "World") == 0); + // Check the correct message arrived + rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather"); + assert (rc == 9); // Join group during connection optvallen - rc = zmq_join (dish, "Hello"); + rc = zmq_join (dish, "TV"); assert (rc == 0); zmq_sleep (1); // This should arrive now as we joined the group - rc = zmq_send (radio, "Hello\0Message", 13, 0); - assert (rc == 13); + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc == 7); - rc = zmq_recv (dish, data, 13, 0); - assert (rc == 13); - assert (strcmp (data, "Hello") == 0); + // Check the correct message arrived + rc = msg_recv_cmp (&msg, dish, "TV", "Friends"); + assert (rc == 7); - // Leaving group - rc = zmq_leave (dish, "Hello"); + // Leaving groupr + rc = zmq_leave (dish, "TV"); assert (rc == 0); zmq_sleep (1); - // This is not going to be sent as dish only subscribe to "World" - rc = zmq_send (radio, "Hello\0Message", 13, 0); - assert (rc == 13); + // This is not going to be sent as dish only subscribe to "Movies" + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc == 7); // This is going to be sent to the dish - rc = zmq_send (radio, "World\0Message", 13, 0); - assert (rc == 13); + rc = msg_send (&msg, radio, "Movies", "Godfather"); + assert (rc == 9); - rc = zmq_recv (dish, data, 13, 0); - assert (rc == 13); - assert (strcmp (data, "World") == 0); - - free (data); + // Check the correct message arrived + rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather"); + assert (rc == 9); rc = zmq_close (dish); assert (rc == 0);