mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-19 00:46:05 +01:00
Merge pull request #2756 from bluca/reconnect_ivl_connect
Problem: zmq_connect fails after disconnect due to RECONNECT_IVL == -1
This commit is contained in:
commit
f6688f0516
1
.gitignore
vendored
1
.gitignore
vendored
@ -139,6 +139,7 @@ test_base85
|
||||
test_bind_after_connect_tcp
|
||||
test_sodium
|
||||
test_zmq_poll_fd
|
||||
test_reconnect_ivl
|
||||
tests/test*.log
|
||||
tests/test*.trs
|
||||
src/platform.hpp*
|
||||
|
@ -418,6 +418,7 @@ test_apps = \
|
||||
tests/test_base85 \
|
||||
tests/test_bind_after_connect_tcp \
|
||||
tests/test_sodium \
|
||||
tests/test_reconnect_ivl \
|
||||
tests/test_socket_null
|
||||
|
||||
tests_test_ancillaries_SOURCES = tests/test_ancillaries.cpp
|
||||
@ -638,6 +639,9 @@ tests_test_sodium_LDADD = src/libzmq.la
|
||||
tests_test_socket_null_SOURCES = tests/test_socket_null.cpp
|
||||
tests_test_socket_null_LDADD = src/libzmq.la
|
||||
|
||||
tests_test_reconnect_ivl_SOURCES = tests/test_reconnect_ivl.cpp
|
||||
tests_test_reconnect_ivl_LDADD = src/libzmq.la
|
||||
|
||||
if HAVE_CURVE
|
||||
|
||||
test_apps += \
|
||||
|
@ -30,6 +30,7 @@
|
||||
#ifndef __ZMQ_COMMAND_HPP_INCLUDED__
|
||||
#define __ZMQ_COMMAND_HPP_INCLUDED__
|
||||
|
||||
#include <string>
|
||||
#include "stdint.hpp"
|
||||
|
||||
namespace zmq
|
||||
@ -69,6 +70,7 @@ namespace zmq
|
||||
term_req,
|
||||
term,
|
||||
term_ack,
|
||||
term_endpoint,
|
||||
reap,
|
||||
reaped,
|
||||
inproc_connected,
|
||||
@ -153,6 +155,12 @@ namespace zmq
|
||||
struct {
|
||||
} term_ack;
|
||||
|
||||
// Sent by session_base (I/O thread) to socket (application thread)
|
||||
// to ask to disconnect the endpoint.
|
||||
struct {
|
||||
std::string *endpoint;
|
||||
} term_endpoint;
|
||||
|
||||
// Transfers the ownership of the closed socket
|
||||
// to the reaper thread.
|
||||
struct {
|
||||
|
@ -134,6 +134,10 @@ void zmq::object_t::process_command (command_t &cmd_)
|
||||
process_term_ack ();
|
||||
break;
|
||||
|
||||
case command_t::term_endpoint:
|
||||
process_term_endpoint (cmd_.args.term_endpoint.endpoint);
|
||||
break;
|
||||
|
||||
case command_t::reap:
|
||||
process_reap (cmd_.args.reap.socket);
|
||||
break;
|
||||
@ -332,6 +336,16 @@ void zmq::object_t::send_term_ack (own_t *destination_)
|
||||
send_command (cmd);
|
||||
}
|
||||
|
||||
void zmq::object_t::send_term_endpoint (own_t *destination_,
|
||||
std::string *endpoint_)
|
||||
{
|
||||
command_t cmd;
|
||||
cmd.destination = destination_;
|
||||
cmd.type = command_t::term_endpoint;
|
||||
cmd.args.term_endpoint.endpoint = endpoint_;
|
||||
send_command (cmd);
|
||||
}
|
||||
|
||||
void zmq::object_t::send_reap (class socket_base_t *socket_)
|
||||
{
|
||||
command_t cmd;
|
||||
@ -435,6 +449,11 @@ void zmq::object_t::process_term_ack ()
|
||||
zmq_assert (false);
|
||||
}
|
||||
|
||||
void zmq::object_t::process_term_endpoint (std::string *)
|
||||
{
|
||||
zmq_assert (false);
|
||||
}
|
||||
|
||||
void zmq::object_t::process_reap (class socket_base_t *)
|
||||
{
|
||||
zmq_assert (false);
|
||||
|
@ -107,6 +107,7 @@ namespace zmq
|
||||
zmq::own_t *object_);
|
||||
void send_term (zmq::own_t *destination_, int linger_);
|
||||
void send_term_ack (zmq::own_t *destination_);
|
||||
void send_term_endpoint (own_t *destination_, std::string *endpoint_);
|
||||
void send_reap (zmq::socket_base_t *socket_);
|
||||
void send_reaped ();
|
||||
void send_done ();
|
||||
@ -127,6 +128,7 @@ namespace zmq
|
||||
virtual void process_term_req (zmq::own_t *object_);
|
||||
virtual void process_term (int linger_);
|
||||
virtual void process_term_ack ();
|
||||
virtual void process_term_endpoint (std::string *endpoint_);
|
||||
virtual void process_reap (zmq::socket_base_t *socket_);
|
||||
virtual void process_reaped ();
|
||||
|
||||
|
@ -536,6 +536,11 @@ void zmq::session_base_t::reconnect ()
|
||||
// Reconnect.
|
||||
if (options.reconnect_ivl != -1)
|
||||
start_connecting (true);
|
||||
else {
|
||||
std::string *ep = new (std::string);
|
||||
addr->to_string (*ep);
|
||||
send_term_endpoint (socket, ep);
|
||||
}
|
||||
|
||||
// For subscriber sockets we hiccup the inbound pipe, which will cause
|
||||
// the socket object to resend all the subscriptions.
|
||||
|
@ -1425,6 +1425,12 @@ void zmq::socket_base_t::process_term (int linger_)
|
||||
own_t::process_term (linger_);
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
|
||||
{
|
||||
term_endpoint (endpoint_->c_str());
|
||||
delete endpoint_;
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::update_pipe_options(int option_)
|
||||
{
|
||||
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
|
||||
|
@ -250,6 +250,7 @@ namespace zmq
|
||||
void process_stop ();
|
||||
void process_bind (zmq::pipe_t *pipe_);
|
||||
void process_term (int linger_);
|
||||
void process_term_endpoint (std::string *endpoint_);
|
||||
|
||||
void update_pipe_options(int option_);
|
||||
|
||||
|
@ -70,6 +70,7 @@ set(tests
|
||||
test_sodium
|
||||
test_monitor
|
||||
test_socket_null
|
||||
test_reconnect_ivl
|
||||
)
|
||||
if(ZMQ_HAVE_CURVE)
|
||||
list(APPEND tests
|
||||
|
149
tests/test_reconnect_ivl.cpp
Normal file
149
tests/test_reconnect_ivl.cpp
Normal file
@ -0,0 +1,149 @@
|
||||
/*
|
||||
Copyright (c) 2017 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "testutil.hpp"
|
||||
|
||||
|
||||
#ifndef ZMQ_HAVE_WINDOWS
|
||||
void test_reconnect_ivl_ipc (void)
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
void *sb = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (sb);
|
||||
int rc = zmq_bind (sb, "ipc:///tmp/test_reconnect_ivl");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sc = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (sc);
|
||||
int interval = -1;
|
||||
rc = zmq_setsockopt (sc, ZMQ_RECONNECT_IVL, &interval, sizeof (int));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sc, "ipc:///tmp/test_reconnect_ivl");
|
||||
assert (rc == 0);
|
||||
|
||||
bounce (sb, sc);
|
||||
|
||||
rc = zmq_unbind (sb, "ipc:///tmp/test_reconnect_ivl");
|
||||
assert (rc == 0);
|
||||
|
||||
expect_bounce_fail (sb, sc);
|
||||
|
||||
rc = zmq_bind (sb, "ipc:///tmp/test_reconnect_ivl");
|
||||
assert (rc == 0);
|
||||
|
||||
expect_bounce_fail (sb, sc);
|
||||
|
||||
rc = zmq_connect (sc, "ipc:///tmp/test_reconnect_ivl");
|
||||
assert (rc == 0);
|
||||
|
||||
bounce (sb, sc);
|
||||
|
||||
rc = zmq_close (sc);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sb);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
void test_reconnect_ivl_tcp (const char *address)
|
||||
{
|
||||
size_t len = MAX_SOCKET_STRING;
|
||||
char my_endpoint[MAX_SOCKET_STRING];
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
if (streq (address, "tcp://[::1]:*")) {
|
||||
if (is_ipv6_available ()) {
|
||||
zmq_ctx_set(ctx, ZMQ_IPV6, 1);
|
||||
} else {
|
||||
zmq_ctx_term (ctx);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void *sb = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (sb);
|
||||
int rc = zmq_bind (sb, address);
|
||||
assert (rc == 0);
|
||||
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
|
||||
assert (rc == 0);
|
||||
|
||||
void *sc = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (sc);
|
||||
int interval = -1;
|
||||
rc = zmq_setsockopt (sc, ZMQ_RECONNECT_IVL, &interval, sizeof (int));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sc, my_endpoint);
|
||||
assert (rc == 0);
|
||||
|
||||
bounce (sb, sc);
|
||||
|
||||
rc = zmq_unbind (sb, my_endpoint);
|
||||
assert (rc == 0);
|
||||
|
||||
expect_bounce_fail (sb, sc);
|
||||
|
||||
rc = zmq_bind (sb, my_endpoint);
|
||||
assert (rc == 0);
|
||||
|
||||
expect_bounce_fail (sb, sc);
|
||||
|
||||
rc = zmq_connect (sc, my_endpoint);
|
||||
assert (rc == 0);
|
||||
|
||||
bounce (sb, sc);
|
||||
|
||||
rc = zmq_close (sc);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sb);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
int main (void)
|
||||
{
|
||||
setup_test_environment ();
|
||||
|
||||
#ifndef ZMQ_HAVE_WINDOWS
|
||||
test_reconnect_ivl_ipc ();
|
||||
#endif
|
||||
test_reconnect_ivl_tcp ("tcp://127.0.0.1:*");
|
||||
test_reconnect_ivl_tcp ("tcp://[::1]:*");
|
||||
|
||||
return 0 ;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user