reconnect added to zmq_connecter

This commit is contained in:
Martin Sustrik 2009-09-16 11:02:18 +02:00
parent 6e03cb2f3e
commit 9c522dccaf
11 changed files with 66 additions and 20 deletions

View File

@ -20,6 +20,7 @@
#include <zmq.h> #include <zmq.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <assert.h> #include <assert.h>
int main (int argc, char *argv []) int main (int argc, char *argv [])
@ -54,10 +55,11 @@ int main (int argc, char *argv [])
rc = zmq_connect (s, connect_to); rc = zmq_connect (s, connect_to);
assert (rc == 0); assert (rc == 0);
watch = zmq_stopwatch_start ();
rc = zmq_msg_init_size (&msg, message_size); rc = zmq_msg_init_size (&msg, message_size);
assert (rc == 0); assert (rc == 0);
memset (zmq_msg_data (&msg), 0, message_size);
watch = zmq_stopwatch_start ();
for (i = 0; i != roundtrip_count; i++) { for (i = 0; i != roundtrip_count; i++) {
rc = zmq_send (s, &msg, 0); rc = zmq_send (s, &msg, 0);
@ -67,11 +69,11 @@ int main (int argc, char *argv [])
assert (zmq_msg_size (&msg) == message_size); assert (zmq_msg_size (&msg) == message_size);
} }
elapsed = zmq_stopwatch_stop (watch);
rc = zmq_msg_close (&msg); rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
elapsed = zmq_stopwatch_stop (watch);
latency = (double) elapsed / (roundtrip_count * 2); latency = (double) elapsed / (roundtrip_count * 2);
printf ("message size: %d [B]\n", (int) message_size); printf ("message size: %d [B]\n", (int) message_size);

View File

@ -22,6 +22,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
#include <stddef.h> #include <stddef.h>
#include <string.h>
int main (int argc, char *argv []) int main (int argc, char *argv [])
{ {
@ -39,10 +40,12 @@ int main (int argc, char *argv [])
zmq::socket_t s (ctx, ZMQ_REQ); zmq::socket_t s (ctx, ZMQ_REQ);
s.connect (connect_to); s.connect (connect_to);
zmq::message_t msg (message_size);
memset (msg.data (), 0, message_size);
void *watch = zmq_stopwatch_start (); void *watch = zmq_stopwatch_start ();
for (int i = 0; i != roundtrip_count; i++) { for (int i = 0; i != roundtrip_count; i++) {
zmq::message_t msg (message_size);
s.send (msg); s.send (msg);
s.recv (&msg); s.recv (&msg);
assert (msg.size () == message_size); assert (msg.size () == message_size);

View File

@ -23,14 +23,15 @@
#include "pipe.hpp" #include "pipe.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const char *name_, const options_t &options_) : const char *name_, const options_t &options_, bool reconnect_) :
owned_t (parent_, owner_), owned_t (parent_, owner_),
in_pipe (NULL), in_pipe (NULL),
active (true), active (true),
out_pipe (NULL), out_pipe (NULL),
engine (NULL), engine (NULL),
name (name_), name (name_),
options (options_) options (options_),
reconnect (reconnect_)
{ {
} }
@ -69,7 +70,9 @@ void zmq::session_t::flush ()
void zmq::session_t::detach () void zmq::session_t::detach ()
{ {
// Engine is terminating itself. // TODO: Start reconnection process here.
// Engine is terminating itself. No need to deallocate it from here.
engine = NULL; engine = NULL;
// In the case od anonymous connection, terminate the session. // In the case od anonymous connection, terminate the session.

View File

@ -35,7 +35,7 @@ namespace zmq
public: public:
session_t (object_t *parent_, socket_base_t *owner_, const char *name_, session_t (object_t *parent_, socket_base_t *owner_, const char *name_,
const options_t &options_); const options_t &options_, bool reconnect_);
// i_inout interface implementation. // i_inout interface implementation.
bool read (::zmq_msg_t *msg_); bool read (::zmq_msg_t *msg_);
@ -77,6 +77,9 @@ namespace zmq
// Inherited socket options. // Inherited socket options.
options_t options; options_t options;
// If true, reconnection is required after connection breaks.
bool reconnect;
session_t (const session_t&); session_t (const session_t&);
void operator = (const session_t&); void operator = (const session_t&);
}; };

View File

@ -236,7 +236,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create the session. // Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str (), session_t *session = new session_t (io_thread, this, session_name.c_str (),
options); options, true);
zmq_assert (session); zmq_assert (session);
// Create inbound pipe. // Create inbound pipe.
@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// it can bind the new connection to the session once it is established. // it can bind the new connection to the session once it is established.
zmq_connecter_t *connecter = new zmq_connecter_t ( zmq_connecter_t *connecter = new zmq_connecter_t (
choose_io_thread (options.affinity), this, options, choose_io_thread (options.affinity), this, options,
session_name.c_str ()); session_name.c_str (), false);
int rc = connecter->set_address (addr_args.c_str ()); int rc = connecter->set_address (addr_args.c_str ());
if (rc != 0) { if (rc != 0) {
delete connecter; delete connecter;

View File

@ -24,10 +24,11 @@
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
socket_base_t *owner_, const options_t &options_, socket_base_t *owner_, const options_t &options_,
const char *session_name_) : const char *session_name_, bool wait_) :
owned_t (parent_, owner_), owned_t (parent_, owner_),
io_object_t (parent_), io_object_t (parent_),
handle_valid (false), handle_valid (false),
wait (wait_),
options (options_), options (options_),
session_name (session_name_) session_name (session_name_)
{ {
@ -44,12 +45,17 @@ int zmq::zmq_connecter_t::set_address (const char *addr_)
void zmq::zmq_connecter_t::process_plug () void zmq::zmq_connecter_t::process_plug ()
{ {
start_connecting (); if (wait)
add_timer ();
else
start_connecting ();
owned_t::process_plug (); owned_t::process_plug ();
} }
void zmq::zmq_connecter_t::process_unplug () void zmq::zmq_connecter_t::process_unplug ()
{ {
if (wait)
cancel_timer ();
if (handle_valid) if (handle_valid)
rm_fd (handle); rm_fd (handle);
} }
@ -68,8 +74,13 @@ void zmq::zmq_connecter_t::out_event ()
rm_fd (handle); rm_fd (handle);
handle_valid = false; handle_valid = false;
// TODO: Handle the error condition by eventual reconnect. // Handle the error condition by attempt to reconnect.
zmq_assert (fd != retired_fd); if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
add_timer ();
return;
}
// Create an init object. // Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
@ -85,7 +96,8 @@ void zmq::zmq_connecter_t::out_event ()
void zmq::zmq_connecter_t::timer_event () void zmq::zmq_connecter_t::timer_event ()
{ {
zmq_assert (false); wait = false;
start_connecting ();
} }
void zmq::zmq_connecter_t::start_connecting () void zmq::zmq_connecter_t::start_connecting ()

View File

@ -36,7 +36,7 @@ namespace zmq
public: public:
zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_, zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_,
const options_t &options_, const char *session_name_); const options_t &options_, const char *session_name_, bool wait_);
~zmq_connecter_t (); ~zmq_connecter_t ();
// Set IP address to connect to. // Set IP address to connect to.
@ -66,6 +66,9 @@ namespace zmq
// contains valid value. // contains valid value.
bool handle_valid; bool handle_valid;
// If true, connecter is waiting a while before trying to connect.
bool wait;
// Associated socket options. // Associated socket options.
options_t options; options_t options;

View File

@ -18,6 +18,7 @@
*/ */
#include "zmq_connecter_init.hpp" #include "zmq_connecter_init.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp" #include "session.hpp"
#include "err.hpp" #include "err.hpp"
@ -83,8 +84,25 @@ void zmq::zmq_connecter_init_t::flush ()
void zmq::zmq_connecter_init_t::detach () void zmq::zmq_connecter_init_t::detach ()
{ {
// TODO: Engine is closing down. Init object is to be closed as well. // TODO: Start reconnection process here.
zmq_assert (false); /*
// Create a connecter object to attempt reconnect. Ask it to wait for a
// while before reconnecting.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_connecter_t *connecter = new zmq_connecter_t (io_thread, owner,
options, session_name.c_str (), true);
connecter->set_address (...);
zmq_assert (connecter);
send_plug (connecter);
send_own (owner, connecter);
*/
// This function is called by engine when disconnection occurs.
// The engine will destroy itself, so we just drop the pointer here and
// start termination of the init object.
engine = NULL;
term ();
} }
void zmq::zmq_connecter_init_t::process_plug () void zmq::zmq_connecter_init_t::process_plug ()

View File

@ -18,6 +18,7 @@
*/ */
#include "zmq_engine.hpp" #include "zmq_engine.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "i_inout.hpp" #include "i_inout.hpp"
#include "config.hpp" #include "config.hpp"

View File

@ -25,6 +25,7 @@
#include "tcp_socket.hpp" #include "tcp_socket.hpp"
#include "zmq_encoder.hpp" #include "zmq_encoder.hpp"
#include "zmq_decoder.hpp" #include "zmq_decoder.hpp"
#include "options.hpp"
namespace zmq namespace zmq
{ {

View File

@ -76,7 +76,7 @@ void zmq::zmq_listener_init_t::flush ()
if (!session) { if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
session = new session_t (io_thread, owner, peer_identity.c_str (), session = new session_t (io_thread, owner, peer_identity.c_str (),
options); options, false);
zmq_assert (session); zmq_assert (session);
send_plug (session); send_plug (session);
send_own (owner, session); send_own (owner, session);