diff --git a/Makefile.am b/Makefile.am index 502f6d91..3c322ee9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -26,6 +26,8 @@ src_libzmq_la_SOURCES = \ src/atomic_counter.hpp \ src/atomic_ptr.hpp \ src/blob.hpp \ + src/client.cpp \ + src/client.hpp \ src/clock.cpp \ src/clock.hpp \ src/command.hpp \ @@ -341,7 +343,9 @@ test_apps = \ tests/test_xpub_manual \ tests/test_xpub_welcome_msg \ tests/test_atomics \ - tests/test_server + tests/test_client_server \ + tests/test_server_drop_more \ + tests/test_client_drop_more tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_LDADD = src/libzmq.la @@ -513,8 +517,14 @@ tests_test_xpub_welcome_msg_LDADD = src/libzmq.la tests_test_atomics_SOURCES = tests/test_atomics.cpp tests_test_atomics_LDADD = src/libzmq.la -tests_test_server_SOURCES = tests/test_server.cpp -tests_test_server_LDADD = src/libzmq.la +tests_test_client_server_SOURCES = tests/test_client_server.cpp +tests_test_client_server_LDADD = src/libzmq.la + +tests_test_server_drop_more_SOURCES = tests/test_server_drop_more.cpp +tests_test_server_drop_more_LDADD = src/libzmq.la + +tests_test_client_drop_more_SOURCES = tests/test_client_drop_more.cpp +tests_test_client_drop_more_LDADD = src/libzmq.la if !ON_MINGW if !ON_CYGWIN diff --git a/include/zmq.h b/include/zmq.h index 9503b262..e9cf5933 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -239,6 +239,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg); #define ZMQ_XSUB 10 #define ZMQ_STREAM 11 #define ZMQ_SERVER 12 +#define ZMQ_CLIENT 13 /* Deprecated aliases */ #define ZMQ_XREQ ZMQ_DEALER diff --git a/src/client.cpp b/src/client.cpp new file mode 100644 index 00000000..122012d6 --- /dev/null +++ b/src/client.cpp @@ -0,0 +1,102 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "client.hpp" +#include "err.hpp" +#include "msg.hpp" + +zmq::client_t::client_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_) +{ + options.type = ZMQ_CLIENT; +} + +zmq::client_t::~client_t () +{ +} + +void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +{ + // subscribe_to_all_ is unused + (void) subscribe_to_all_; + + zmq_assert (pipe_); + + fq.attach (pipe_); + lb.attach (pipe_); +} + +int zmq::client_t::xsend (msg_t *msg_) +{ + zmq_assert(!(msg_->flags () & msg_t::more)); + + return lb.sendpipe (msg_, NULL); +} + +int zmq::client_t::xrecv (msg_t *msg_) +{ + int rc = fq.recvpipe (msg_, NULL); + + // Drop any messages with more flag + while (rc == 0 && msg_->flags () & msg_t::more) { + + // drop all frames of the current multi-frame message + rc = fq.recvpipe (msg_, NULL); + + while (rc == 0 && msg_->flags () & msg_t::more) + rc = fq.recvpipe (msg_, NULL); + + // get the new message + if (rc == 0) + rc = fq.recvpipe (msg_, NULL); + } + + return rc; +} + +bool zmq::client_t::xhas_in () +{ + return fq.has_in (); +} + +bool zmq::client_t::xhas_out () +{ + return lb.has_out (); +} + +zmq::blob_t zmq::client_t::get_credential () const +{ + return fq.get_credential (); +} + +void zmq::client_t::xread_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); +} + +void zmq::client_t::xwrite_activated (pipe_t *pipe_) +{ + lb.activated (pipe_); +} + +void zmq::client_t::xpipe_terminated (pipe_t *pipe_) +{ + fq.pipe_terminated (pipe_); + lb.pipe_terminated (pipe_); +} \ No newline at end of file diff --git a/src/client.hpp b/src/client.hpp new file mode 100644 index 00000000..8c9e2ad5 --- /dev/null +++ b/src/client.hpp @@ -0,0 +1,71 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_CLIENT_HPP_INCLUDED__ +#define __ZMQ_CLIENT_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "session_base.hpp" +#include "fq.hpp" +#include "lb.hpp" + +namespace zmq +{ + + class ctx_t; + class msg_t; + class pipe_t; + class io_thread_t; + class socket_base_t; + + class client_t : + public socket_base_t + { + public: + + client_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); + ~client_t (); + + protected: + + // Overrides of functions from socket_base_t. + void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + int xsend (zmq::msg_t *msg_); + int xrecv (zmq::msg_t *msg_); + bool xhas_in (); + bool xhas_out (); + blob_t get_credential () const; + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); + + private: + + // Messages are fair-queued from inbound pipes. And load-balanced to + // the outbound pipes. + fq_t fq; + lb_t lb; + + client_t (const client_t &); + const client_t &operator = (const client_t&); + }; + +} + +#endif diff --git a/src/mechanism.cpp b/src/mechanism.cpp index cc32a175..c075179b 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -64,8 +64,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const { static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP", "DEALER", "ROUTER", "PULL", "PUSH", - "XPUB", "XSUB", "STREAM", "SERVER"}; - zmq_assert (socket_type >= 0 && socket_type <= 12); + "XPUB", "XSUB", "STREAM", "SERVER", "CLIENT"}; + zmq_assert (socket_type >= 0 && socket_type <= 13); return names [socket_type]; } @@ -160,7 +160,7 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const case ZMQ_REP: return type_ == "REQ" || type_ == "DEALER"; case ZMQ_DEALER: - return type_ == "REP" || type_ == "DEALER" || type_ == "ROUTER" || type_ == "SERVER"; + return type_ == "REP" || type_ == "DEALER" || type_ == "ROUTER"; case ZMQ_ROUTER: return type_ == "REQ" || type_ == "DEALER" || type_ == "ROUTER"; case ZMQ_PUSH: @@ -178,7 +178,9 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const case ZMQ_PAIR: return type_ == "PAIR"; case ZMQ_SERVER: - return type_ == "DEALER"; + return type_ == "CLIENT"; + case ZMQ_CLIENT: + return type_ == "CLIENT" || type_ == "SERVER"; default: break; } diff --git a/src/server.cpp b/src/server.cpp index 549dbe34..e77cd0bd 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -53,14 +53,6 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) fq.attach (pipe_); } -int zmq::server_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - errno = EINVAL; - return -1; -} - - void zmq::server_t::xpipe_terminated (pipe_t *pipe_) { outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ()); @@ -88,12 +80,8 @@ void zmq::server_t::xwrite_activated (pipe_t *pipe_) int zmq::server_t::xsend (msg_t *msg_) { - // Server doesn't support multipart - if (msg_->flags () & msg_t::more) { - errno = EINVAL; - return -1; - } - + zmq_assert(!(msg_->flags () & msg_t::more)); + // Find the pipe associated with the routing stored in the message. uint32_t routing_id = msg_->get_routing_id(); outpipes_t::iterator it = outpipes.find (routing_id); @@ -131,19 +119,25 @@ int zmq::server_t::xrecv (msg_t *msg_) pipe_t *pipe = NULL; int rc = fq.recvpipe (msg_, &pipe); + // Drop any messages with more flag + while (rc == 0 && msg_->flags () & msg_t::more) { + + // drop all frames of the current multi-frame message + rc = fq.recvpipe (msg_, NULL); + + while (rc == 0 && msg_->flags () & msg_t::more) + rc = fq.recvpipe (msg_, NULL); + + // get the new message + if (rc == 0) + rc = fq.recvpipe (msg_, &pipe); + } + if (rc != 0) - return -1; + return rc; zmq_assert (pipe != NULL); - if (msg_->flags () & msg_t::more) { - msg_->close(); - msg_->init(); - - errno = EINVAL; - return -1; - } - uint32_t routing_id = pipe->get_routing_id(); msg_->set_routing_id(routing_id); diff --git a/src/server.hpp b/src/server.hpp index 43202da5..2fb642be 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -45,8 +45,7 @@ namespace zmq ~server_t (); // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); diff --git a/src/session_base.cpp b/src/session_base.cpp index 4af341c6..0eb7998e 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -56,6 +56,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_PAIR: case ZMQ_STREAM: case ZMQ_SERVER: + case ZMQ_CLIENT: s = new (std::nothrow) session_base_t (io_thread_, active_, socket_, options_, addr_); break; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b1eb3b3d..198277a6 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -71,6 +71,7 @@ #include "xsub.hpp" #include "stream.hpp" #include "server.hpp" +#include "client.hpp" bool zmq::socket_base_t::check_tag () { @@ -121,6 +122,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_SERVER: s = new (std::nothrow) server_t (parent_, tid_, sid_); break; + case ZMQ_CLIENT: + s = new (std::nothrow) client_t (parent_, tid_, sid_); + break; default: errno = EINVAL; return NULL; diff --git a/tests/test_client_drop_more.cpp b/tests/test_client_drop_more.cpp new file mode 100644 index 00000000..47e1b1ae --- /dev/null +++ b/tests/test_client_drop_more.cpp @@ -0,0 +1,106 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +int send_msg(zmq_msg_t* msg, void* s, int flags, int value); + +int main (void) +{ + setup_test_environment(); + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *client = zmq_socket (ctx, ZMQ_CLIENT); + void *dealer = zmq_socket (ctx, ZMQ_DEALER); + + int rc; + + rc = zmq_bind (client, "inproc://serverdropmore"); + assert (rc == 0); + + rc = zmq_connect (dealer, "inproc://serverdropmore"); + assert (rc == 0); + + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + + // we will send 2 3-frames messages and then single frame message, only last one should be received + rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 1); + assert(rc == 1); + + rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 2); + assert(rc == 1); + + rc = send_msg (&msg, dealer, 0, 3); + assert(rc == 1); + + rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 4); + assert(rc == 1); + + rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 5); + assert(rc == 1); + + rc = send_msg (&msg, dealer, 0, 6); + assert(rc == 1); + + rc = send_msg (&msg, dealer, 0, 7); + assert(rc == 1); + + rc = zmq_msg_recv (&msg, client, 0); + assert (rc == 1); + + assert(zmq_msg_more(&msg) == 0); + + unsigned char* data = (unsigned char*)zmq_msg_data (&msg); + assert (data[0] == 7); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (client); + assert (rc == 0); + + rc = zmq_close (dealer); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} + +int send_msg(zmq_msg_t* msg, void* s, int flags, int value) +{ + int rc = zmq_msg_close(msg); + + if (rc != 0) + return rc; + + zmq_msg_init_size(msg, 1); + + if (rc != 0) + return rc; + + unsigned char* data = (unsigned char*)zmq_msg_data(msg); + data[0] = (unsigned char)value; + + return zmq_msg_send (msg, s, flags); +} diff --git a/tests/test_server.cpp b/tests/test_client_server.cpp similarity index 96% rename from tests/test_server.cpp rename to tests/test_client_server.cpp index 2e2ded2e..1fc7915d 100644 --- a/tests/test_server.cpp +++ b/tests/test_client_server.cpp @@ -21,14 +21,12 @@ int main (void) { - printf("0000"); - setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); void *server = zmq_socket (ctx, ZMQ_SERVER); - void *client = zmq_socket (ctx, ZMQ_DEALER); + void *client = zmq_socket (ctx, ZMQ_CLIENT); int rc; diff --git a/tests/test_server_drop_more.cpp b/tests/test_server_drop_more.cpp new file mode 100644 index 00000000..868dd456 --- /dev/null +++ b/tests/test_server_drop_more.cpp @@ -0,0 +1,106 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +int send_msg(zmq_msg_t* msg, void* s, int flags, int value); + +int main (void) +{ + setup_test_environment(); + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *server = zmq_socket (ctx, ZMQ_SERVER); + void *client = zmq_socket (ctx, ZMQ_DEALER); + + int rc; + + rc = zmq_bind (server, "inproc://serverdropmore"); + assert (rc == 0); + + rc = zmq_connect (client, "inproc://serverdropmore"); + assert (rc == 0); + + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + + // we will send 2 3-frames messages and then single frame message, only last one should be received + rc = send_msg (&msg, client, ZMQ_SNDMORE, 1); + assert(rc == 1); + + rc = send_msg (&msg, client, ZMQ_SNDMORE, 2); + assert(rc == 1); + + rc = send_msg (&msg, client, 0, 3); + assert(rc == 1); + + rc = send_msg (&msg, client, ZMQ_SNDMORE, 4); + assert(rc == 1); + + rc = send_msg (&msg, client, ZMQ_SNDMORE, 5); + assert(rc == 1); + + rc = send_msg (&msg, client, 0, 6); + assert(rc == 1); + + rc = send_msg (&msg, client, 0, 7); + assert(rc == 1); + + rc = zmq_msg_recv (&msg, server, 0); + assert (rc == 1); + + assert(zmq_msg_more(&msg) == 0); + + unsigned char* data = (unsigned char*)zmq_msg_data (&msg); + assert (data[0] == 7); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (server); + assert (rc == 0); + + rc = zmq_close (client); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} + +int send_msg(zmq_msg_t* msg, void* s, int flags, int value) +{ + int rc = zmq_msg_close(msg); + + if (rc != 0) + return rc; + + zmq_msg_init_size(msg, 1); + + if (rc != 0) + return rc; + + unsigned char* data = (unsigned char*)zmq_msg_data(msg); + data[0] = (unsigned char)value; + + return zmq_msg_send (msg, s, flags); +}