diff --git a/include/zmq.h b/include/zmq.h index f93b7a7c..af92cf77 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -220,6 +220,12 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property); ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval); ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property); +// DAB - these are millisecond sleeps to bias data races +extern ZMQ_EXPORT int zmq_lb_race_window_1_size; +extern ZMQ_EXPORT int zmq_lb_race_window_2_size; +extern ZMQ_EXPORT int zmq_fq_race_window_1_size; +extern ZMQ_EXPORT int zmq_fq_race_window_2_size; + /******************************************************************************/ /* 0MQ socket definition. */ diff --git a/src/fq.cpp b/src/fq.cpp index 932503e5..13575e03 100644 --- a/src/fq.cpp +++ b/src/fq.cpp @@ -16,18 +16,36 @@ You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ +#include #include "fq.hpp" #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#ifdef ZMQ_HAVE_WINDOWS +# define msleep(milliseconds) {if(milliseconds) Sleep (milliseconds);} +#else +# include +# define msleep(milliseconds) {if(milliseconds) usleep (static_cast (milliseconds) * 1000);} +#endif +#define DB_TRACE(tag) int my_seq = ++seq; \ + pthread_t self = pthread_self(); \ + fprintf(stderr, "=> %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \ + +#define DB_TRACE_EXIT(tag) fprintf(stderr, "<= %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \ + +int zmq_fq_race_window_1_size = 0 ; +int zmq_fq_race_window_2_size = 0 ; + +static int seq = 0 ; zmq::fq_t::fq_t () : active (0), last_in (NULL), current (0), more (false) { + DB_TRACE("fq_cons"); } zmq::fq_t::~fq_t () @@ -44,6 +62,7 @@ void zmq::fq_t::attach (pipe_t *pipe_) void zmq::fq_t::pipe_terminated (pipe_t *pipe_) { + DB_TRACE("fq_term") ; const pipes_t::size_type index = pipes.index (pipe_); // Remove the pipe from the list; adjust number of active pipes @@ -60,6 +79,7 @@ void zmq::fq_t::pipe_terminated (pipe_t *pipe_) saved_credential = last_in->get_credential (); last_in = NULL; } + DB_TRACE_EXIT("fq_term") ; } void zmq::fq_t::activated (pipe_t *pipe_) @@ -76,17 +96,22 @@ int zmq::fq_t::recv (msg_t *msg_) int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) { + DB_TRACE("fq_recvpipe"); // Deallocate old content of the message. int rc = msg_->close (); errno_assert (rc == 0); // Round-robin over the pipes to get the next message. while (active > 0) { - + // DAB - bias the race to provoke problems with read + msleep(zmq_fq_race_window_1_size) ; // Try to fetch new message. If we've already read part of the message // subsequent part should be immediately available. bool fetched = pipes [current]->read (msg_); + // DAB - bias the race to provoke problems with % + msleep(zmq_fq_race_window_2_size) ; + // Note that when message is not fetched, current pipe is deactivated // and replaced by another active pipe. Thus we don't have to increase // the 'current' pointer. @@ -98,6 +123,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) last_in = pipes [current]; current = (current + 1) % active; } + DB_TRACE_EXIT("fq_recvpipe"); return 0; } @@ -117,6 +143,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) rc = msg_->init (); errno_assert (rc == 0); errno = EAGAIN; + DB_TRACE_EXIT("fq_recvpipe"); return -1; } diff --git a/src/lb.cpp b/src/lb.cpp index 7fdd765b..f485ab26 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -16,18 +16,29 @@ You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ - +#include #include "lb.hpp" #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#define DB_TRACE(tag) int my_seq = ++seq; \ + pthread_t self = pthread_self(); \ + fprintf(stderr, "=> %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \ + +#define DB_TRACE_EXIT(tag) fprintf(stderr, "<= %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \ + +int zmq_lb_race_window_1_size = 0 ; +int zmq_lb_race_window_2_size = 0 ; +static int seq = 0 ; + zmq::lb_t::lb_t () : - active (0), + active (0), current (0), more (false), dropping (false) { + DB_TRACE("lb_cons") ; } zmq::lb_t::~lb_t () @@ -43,6 +54,7 @@ void zmq::lb_t::attach (pipe_t *pipe_) void zmq::lb_t::pipe_terminated (pipe_t *pipe_) { + DB_TRACE("lb_term") ; pipes_t::size_type index = pipes.index (pipe_); // If we are in the middle of multipart message and current pipe @@ -59,6 +71,7 @@ void zmq::lb_t::pipe_terminated (pipe_t *pipe_) current = 0; } pipes.erase (pipe_); + DB_TRACE_EXIT("lb_term") ; } void zmq::lb_t::activated (pipe_t *pipe_) @@ -73,8 +86,16 @@ int zmq::lb_t::send (msg_t *msg_) return sendpipe (msg_, NULL); } +#ifdef ZMQ_HAVE_WINDOWS +# define msleep(milliseconds) Sleep (milliseconds); +#else +# include +# define msleep(milliseconds) usleep (static_cast (milliseconds) * 1000); +#endif + int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) { + DB_TRACE("lb_sendpipe") ; // Drop the message if required. If we are at the end of the message // switch back to non-dropping mode. if (dropping) { @@ -90,13 +111,20 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) } while (active > 0) { - if (pipes [current]->write (msg_)) + // DAB - bias the race to provoke problems with write + msleep(zmq_lb_race_window_1_size ) ; + + if (pipes [current]->write (msg_)) { if (pipe_) *pipe_ = pipes [current]; break; } - + if (!(!more)) + { + DB_TRACE("lb_assert"); + fflush(stderr) ; + } zmq_assert (!more); active--; if (current < active) @@ -108,12 +136,17 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) // If there are no pipes we cannot send the message. if (active == 0) { errno = EAGAIN; + DB_TRACE_EXIT("lb_sendpipe") ; return -1; } // If it's final part of the message we can flush it downstream and // continue round-robining (load balance). more = msg_->flags () & msg_t::more? true: false; + + // DAB - bias the race to provoke problems with % + msleep(zmq_lb_race_window_2_size) ; + if (!more) { pipes [current]->flush (); current = (current + 1) % active; @@ -123,6 +156,7 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) int rc = msg_->init (); errno_assert (rc == 0); + DB_TRACE_EXIT("lb_sendpipe") ; return 0; }