diff --git a/.gitignore b/.gitignore index b37f80ff..6ccb0a72 100644 --- a/.gitignore +++ b/.gitignore @@ -39,7 +39,6 @@ tests/test_invalid_rep tests/test_msg_flags tests/test_ts_context tests/test_connect_resolve -tests/test_connect_delay tests/test_term_endpoint src/platform.hpp* src/stamp-h1 diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 167e06cc..0eae8d91 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -342,21 +342,6 @@ Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. -ZMQ_DELAY_ATTACH_ON_CONNECT -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Retrieve the state of the attach on connect value. If set to `1`, will delay the -attachment of a pipe on connect until the underlying connection has completed. -This will cause the socket to block if there are no other connections, but will -prevent queues from filling on pipes awaiting connection. - -[horizontal] -Option value type:: int -Option value unit:: boolean -Default value:: 0 (false) -Applicable socket types:: all, primarily when using TCP/IPC transports. - - ZMQ_FD: Retrieve file descriptor associated with the socket ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_FD' option shall retrieve the file descriptor associated with the diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 153d6f72..fee30cc8 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -351,19 +351,6 @@ Option value unit:: boolean Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. -ZMQ_DELAY_ATTACH_ON_CONNECT -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -If set to `1`, will delay the attachment of a pipe on connect until the underlying -connection has completed. This will cause the socket to block if there are no other -connections, but will prevent queues from filling on pipes awaiting connection. - -[horizontal] -Option value type:: int -Option value unit:: boolean -Default value:: 0 (false) -Applicable socket types:: all, primarily when using TCP/IPC transports. - ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/include/zmq.h b/include/zmq.h index b8813947..58e4017a 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_ACCEPT_FILTER 38 -#define ZMQ_DELAY_ATTACH_ON_CONNECT 39 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/lb.cpp b/src/lb.cpp index d5f2f001..6732f274 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -41,7 +41,8 @@ zmq::lb_t::~lb_t () void zmq::lb_t::attach (pipe_t *pipe_) { pipes.push_back (pipe_); - activated (pipe_); + pipes.swap (active, pipes.size () - 1); + active++; } void zmq::lb_t::terminated (pipe_t *pipe_) diff --git a/src/options.cpp b/src/options.cpp index cbade7d8..fdd5299d 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -44,7 +44,6 @@ zmq::options_t::options_t () : rcvtimeo (-1), sndtimeo (-1), ipv4only (1), - delay_attach_on_connect (0), delay_on_close (true), delay_on_disconnect (true), filter (false), @@ -220,21 +219,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; } - case ZMQ_DELAY_ATTACH_ON_CONNECT: - { - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - int val = *((int*) optval_); - if (val != 0 && val != 1) { - errno = EINVAL; - return -1; - } - delay_attach_on_connect = val; - return 0; - } - case ZMQ_TCP_KEEPALIVE: { if (optvallen_ != sizeof (int)) { @@ -500,15 +484,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; - case ZMQ_DELAY_ATTACH_ON_CONNECT: - if (*optvallen_ < sizeof (int)) { - errno = EINVAL; - return -1; - } - *((int*) optval_) = delay_attach_on_connect; - *optvallen_ = sizeof (int); - return 0; - case ZMQ_TCP_KEEPALIVE: if (*optvallen_ < sizeof (int)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index fe435b16..2f6e6283 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -96,10 +96,6 @@ namespace zmq // possible to communicate with IPv6-only hosts. If 0, the socket can // connect to and accept connections from both IPv4 and IPv6 hosts. int ipv4only; - - // If 1, connecting pipes are not attached immediately, meaning a send() - // on a socket with only connecting pipes would block - int delay_attach_on_connect; // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. diff --git a/src/session_base.cpp b/src/session_base.cpp index 21877b4a..8a879584 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -111,7 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, io_object_t (io_thread_), connect (connect_), pipe (NULL), - outpipe (NULL), incomplete_in (false), pending (false), engine (NULL), @@ -151,13 +150,6 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) pipe->set_event_sink (this); } -void zmq::session_base_t::onconnect_attach_pipe (pipe_t *pipe_) -{ - zmq_assert (!is_terminating ()); - zmq_assert (pipe_); - outpipe = pipe_; -} - int zmq::session_base_t::read (msg_t *msg_) { // First message to send is identity (if required). @@ -237,12 +229,6 @@ void zmq::session_base_t::clean_pipes () void zmq::session_base_t::terminated (pipe_t *pipe_) { - // If we get a term signal from our held outpipe - // we can safely ignore it. - if (pipe_ == outpipe) { - return; - } - // Drop the reference to the deallocated pipe. zmq_assert (pipe == pipe_); pipe = NULL; @@ -324,12 +310,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_) send_bind (socket, pipes [1]); } - if (outpipe && options.delay_attach_on_connect) { - send_bind (socket, outpipe); - // Forget the outpipe - outpipe = NULL; - } - // Plug in the engine. zmq_assert (!engine); engine = engine_; @@ -378,12 +358,6 @@ void zmq::session_base_t::process_term (int linger_) // Start pipe termination process. Delay the termination till all messages // are processed in case the linger time is non-zero. pipe->terminate (linger_ != 0); - - // If we're storing to a to be connected, we can clear that as well - if (outpipe) { - outpipe->set_event_sink (this); - outpipe->terminate (linger_ != 0); - } // TODO: Should this go into pipe_t::terminate ? // In case there's no engine and there's only delimiter in the @@ -411,9 +385,6 @@ void zmq::session_base_t::timer_event (int id_) // Ask pipe to terminate even though there may be pending messages in it. zmq_assert (pipe); pipe->terminate (false); - - if (outpipe) - outpipe->terminate (false); } void zmq::session_base_t::detached () diff --git a/src/session_base.hpp b/src/session_base.hpp index f47cc94a..8244cb59 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -52,9 +52,6 @@ namespace zmq // To be used once only, when creating the session. void attach_pipe (zmq::pipe_t *pipe_); - - // To be used once only, for delayed connection - void onconnect_attach_pipe (pipe_t *pipe_); // Following functions are the interface exposed towards the engine. virtual int read (msg_t *msg_); @@ -107,9 +104,6 @@ namespace zmq // Pipe connecting the session to its socket. zmq::pipe_t *pipe; - // Pipe connecting the socket to the client - zmq::pipe_t *outpipe; - // This flag is true if the remainder of the message being processed // is still in the in pipe. bool incomplete_in; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3959debf..eaddced6 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -545,13 +545,10 @@ int zmq::socket_base_t::connect (const char *addr_) icanhasall = true; // Attach local end of the pipe to the socket object. - if (options.delay_attach_on_connect == 0) - attach_pipe (pipes [0], icanhasall); + attach_pipe (pipes [0], icanhasall); // Attach remote end of the pipe to the session object later on. session->attach_pipe (pipes [1]); - if (options.delay_attach_on_connect == 1) - session->onconnect_attach_pipe (pipes [0]); // Save last endpoint URI paddr->to_string (options.last_endpoint); diff --git a/tests/Makefile.am b/tests/Makefile.am index 024867d2..aa2c529a 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -13,7 +13,6 @@ noinst_PROGRAMS = test_pair_inproc \ test_invalid_rep \ test_msg_flags \ test_connect_resolve \ - test_connect_delay \ test_last_endpoint \ test_term_endpoint \ test_monitor @@ -35,7 +34,6 @@ test_sub_forward_SOURCES = test_sub_forward.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp test_msg_flags_SOURCES = test_msg_flags.cpp test_connect_resolve_SOURCES = test_connect_resolve.cpp -test_connect_delay_SOURCES = test_connect_delay.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp deleted file mode 100644 index 2446d2c9..00000000 --- a/tests/test_connect_delay.cpp +++ /dev/null @@ -1,131 +0,0 @@ -/* - Copyright (c) 2012 Ian Barber - Copyright (c) 2012 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - - -#include -#include -#include -#include -#include - -#include "../include/zmq.h" - -int main (int argc, char *argv []) -{ - fprintf (stderr, "test_connect_delay running...\n"); - int val; - int rc; - char buffer[16]; - int seen = 0; - - void *context = zmq_ctx_new(); - void *to = zmq_socket(context, ZMQ_PULL); - val = 0; - zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); - zmq_bind(to, "tcp://*:5555"); - - // Create a socket pushing to two endpoints - only 1 message should arrive. - void *from = zmq_socket (context, ZMQ_PUSH); - val = 0; - zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val)); - rc = zmq_connect(from, "tcp://localhost:5556"); - assert (rc == 0); - rc = zmq_connect(from, "tcp://localhost:5555"); - assert (rc == 0); - - for (int i = 0; i < 10; ++i) - { - std::string message("message "); - message += ('0' + i); - zmq_send(from, message.data(), message.size(), 0); - } - - sleep(1); - seen = 0; - for (int i = 0; i < 10; ++i) - { - memset(&buffer, 0, sizeof(buffer)); - rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); - if( rc == -1) - break; - seen++; - } - assert (seen == 5); - - rc = zmq_close (from); - assert (rc == 0); - - rc = zmq_close (to); - assert (rc == 0); - - rc = zmq_ctx_destroy(context); - assert (rc == 0); - - context = zmq_ctx_new(); - std::cout << " Rerunning with DELAY_ATTACH_ON_CONNECT\n"; - - to = zmq_socket(context, ZMQ_PULL); - zmq_bind(to, "tcp://*:5560"); - val = 0; - zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); - - // Create a socket pushing to two endpoints - all messages should arrive. - from = zmq_socket (context, ZMQ_PUSH); - val = 0; - zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val)); - val = 1; - zmq_setsockopt(from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); - rc = zmq_connect(from, "tcp://localhost:5561"); - assert (rc == 0); - rc = zmq_connect(from, "tcp://localhost:5560"); - assert (rc == 0); - - for (int i = 0; i < 10; ++i) - { - std::string message("message "); - message += ('0' + i); - zmq_send(from, message.data(), message.size(), 0); - } - - sleep(1); - - seen = 0; - for (int i = 0; i < 10; ++i) - { - memset(&buffer, 0, sizeof(buffer)); - rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); - if( rc == -1) { - break; - } - seen++; - } - assert (seen == 10); - - rc = zmq_close (from); - assert (rc == 0); - - rc = zmq_close (to); - assert (rc == 0); - - rc = zmq_ctx_destroy(context); - assert (rc == 0); - - return 0; -}