mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-07 09:48:07 +01:00
Add code to investigate data race
Provides control of race windows and logging for some of the interesting transitions.
This commit is contained in:
parent
763bf34e88
commit
5e0facda17
@ -220,6 +220,12 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
|
|||||||
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
|
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
|
||||||
ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property);
|
ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property);
|
||||||
|
|
||||||
|
// DAB - these are millisecond sleeps to bias data races
|
||||||
|
extern ZMQ_EXPORT int zmq_lb_race_window_1_size;
|
||||||
|
extern ZMQ_EXPORT int zmq_lb_race_window_2_size;
|
||||||
|
extern ZMQ_EXPORT int zmq_fq_race_window_1_size;
|
||||||
|
extern ZMQ_EXPORT int zmq_fq_race_window_2_size;
|
||||||
|
|
||||||
|
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
/* 0MQ socket definition. */
|
/* 0MQ socket definition. */
|
||||||
|
29
src/fq.cpp
29
src/fq.cpp
@ -16,18 +16,36 @@
|
|||||||
You should have received a copy of the GNU Lesser General Public License
|
You should have received a copy of the GNU Lesser General Public License
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
#include <stdio.h>
|
||||||
|
|
||||||
#include "fq.hpp"
|
#include "fq.hpp"
|
||||||
#include "pipe.hpp"
|
#include "pipe.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "msg.hpp"
|
#include "msg.hpp"
|
||||||
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
|
# define msleep(milliseconds) {if(milliseconds) Sleep (milliseconds);}
|
||||||
|
#else
|
||||||
|
# include <unistd.h>
|
||||||
|
# define msleep(milliseconds) {if(milliseconds) usleep (static_cast <useconds_t> (milliseconds) * 1000);}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#define DB_TRACE(tag) int my_seq = ++seq; \
|
||||||
|
pthread_t self = pthread_self(); \
|
||||||
|
fprintf(stderr, "=> %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
|
||||||
|
|
||||||
|
#define DB_TRACE_EXIT(tag) fprintf(stderr, "<= %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
|
||||||
|
|
||||||
|
int zmq_fq_race_window_1_size = 0 ;
|
||||||
|
int zmq_fq_race_window_2_size = 0 ;
|
||||||
|
|
||||||
|
static int seq = 0 ;
|
||||||
zmq::fq_t::fq_t () :
|
zmq::fq_t::fq_t () :
|
||||||
active (0),
|
active (0),
|
||||||
last_in (NULL),
|
last_in (NULL),
|
||||||
current (0),
|
current (0),
|
||||||
more (false)
|
more (false)
|
||||||
{
|
{
|
||||||
|
DB_TRACE("fq_cons");
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::fq_t::~fq_t ()
|
zmq::fq_t::~fq_t ()
|
||||||
@ -44,6 +62,7 @@ void zmq::fq_t::attach (pipe_t *pipe_)
|
|||||||
|
|
||||||
void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
|
void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
|
DB_TRACE("fq_term") ;
|
||||||
const pipes_t::size_type index = pipes.index (pipe_);
|
const pipes_t::size_type index = pipes.index (pipe_);
|
||||||
|
|
||||||
// Remove the pipe from the list; adjust number of active pipes
|
// Remove the pipe from the list; adjust number of active pipes
|
||||||
@ -60,6 +79,7 @@ void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
|
|||||||
saved_credential = last_in->get_credential ();
|
saved_credential = last_in->get_credential ();
|
||||||
last_in = NULL;
|
last_in = NULL;
|
||||||
}
|
}
|
||||||
|
DB_TRACE_EXIT("fq_term") ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::fq_t::activated (pipe_t *pipe_)
|
void zmq::fq_t::activated (pipe_t *pipe_)
|
||||||
@ -76,17 +96,22 @@ int zmq::fq_t::recv (msg_t *msg_)
|
|||||||
|
|
||||||
int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
|
int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
|
||||||
{
|
{
|
||||||
|
DB_TRACE("fq_recvpipe");
|
||||||
// Deallocate old content of the message.
|
// Deallocate old content of the message.
|
||||||
int rc = msg_->close ();
|
int rc = msg_->close ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
// Round-robin over the pipes to get the next message.
|
// Round-robin over the pipes to get the next message.
|
||||||
while (active > 0) {
|
while (active > 0) {
|
||||||
|
// DAB - bias the race to provoke problems with read
|
||||||
|
msleep(zmq_fq_race_window_1_size) ;
|
||||||
// Try to fetch new message. If we've already read part of the message
|
// Try to fetch new message. If we've already read part of the message
|
||||||
// subsequent part should be immediately available.
|
// subsequent part should be immediately available.
|
||||||
bool fetched = pipes [current]->read (msg_);
|
bool fetched = pipes [current]->read (msg_);
|
||||||
|
|
||||||
|
// DAB - bias the race to provoke problems with %
|
||||||
|
msleep(zmq_fq_race_window_2_size) ;
|
||||||
|
|
||||||
// Note that when message is not fetched, current pipe is deactivated
|
// Note that when message is not fetched, current pipe is deactivated
|
||||||
// and replaced by another active pipe. Thus we don't have to increase
|
// and replaced by another active pipe. Thus we don't have to increase
|
||||||
// the 'current' pointer.
|
// the 'current' pointer.
|
||||||
@ -98,6 +123,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
|
|||||||
last_in = pipes [current];
|
last_in = pipes [current];
|
||||||
current = (current + 1) % active;
|
current = (current + 1) % active;
|
||||||
}
|
}
|
||||||
|
DB_TRACE_EXIT("fq_recvpipe");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,6 +143,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
|
|||||||
rc = msg_->init ();
|
rc = msg_->init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
errno = EAGAIN;
|
errno = EAGAIN;
|
||||||
|
DB_TRACE_EXIT("fq_recvpipe");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
40
src/lb.cpp
40
src/lb.cpp
@ -16,18 +16,29 @@
|
|||||||
You should have received a copy of the GNU Lesser General Public License
|
You should have received a copy of the GNU Lesser General Public License
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
#include <stdio.h>
|
||||||
#include "lb.hpp"
|
#include "lb.hpp"
|
||||||
#include "pipe.hpp"
|
#include "pipe.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "msg.hpp"
|
#include "msg.hpp"
|
||||||
|
|
||||||
|
#define DB_TRACE(tag) int my_seq = ++seq; \
|
||||||
|
pthread_t self = pthread_self(); \
|
||||||
|
fprintf(stderr, "=> %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
|
||||||
|
|
||||||
|
#define DB_TRACE_EXIT(tag) fprintf(stderr, "<= %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
|
||||||
|
|
||||||
|
int zmq_lb_race_window_1_size = 0 ;
|
||||||
|
int zmq_lb_race_window_2_size = 0 ;
|
||||||
|
static int seq = 0 ;
|
||||||
|
|
||||||
zmq::lb_t::lb_t () :
|
zmq::lb_t::lb_t () :
|
||||||
active (0),
|
active (0),
|
||||||
current (0),
|
current (0),
|
||||||
more (false),
|
more (false),
|
||||||
dropping (false)
|
dropping (false)
|
||||||
{
|
{
|
||||||
|
DB_TRACE("lb_cons") ;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::lb_t::~lb_t ()
|
zmq::lb_t::~lb_t ()
|
||||||
@ -43,6 +54,7 @@ void zmq::lb_t::attach (pipe_t *pipe_)
|
|||||||
|
|
||||||
void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
|
void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
|
DB_TRACE("lb_term") ;
|
||||||
pipes_t::size_type index = pipes.index (pipe_);
|
pipes_t::size_type index = pipes.index (pipe_);
|
||||||
|
|
||||||
// If we are in the middle of multipart message and current pipe
|
// If we are in the middle of multipart message and current pipe
|
||||||
@ -59,6 +71,7 @@ void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
|
|||||||
current = 0;
|
current = 0;
|
||||||
}
|
}
|
||||||
pipes.erase (pipe_);
|
pipes.erase (pipe_);
|
||||||
|
DB_TRACE_EXIT("lb_term") ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::lb_t::activated (pipe_t *pipe_)
|
void zmq::lb_t::activated (pipe_t *pipe_)
|
||||||
@ -73,8 +86,16 @@ int zmq::lb_t::send (msg_t *msg_)
|
|||||||
return sendpipe (msg_, NULL);
|
return sendpipe (msg_, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
|
# define msleep(milliseconds) Sleep (milliseconds);
|
||||||
|
#else
|
||||||
|
# include <unistd.h>
|
||||||
|
# define msleep(milliseconds) usleep (static_cast <useconds_t> (milliseconds) * 1000);
|
||||||
|
#endif
|
||||||
|
|
||||||
int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
|
int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
|
||||||
{
|
{
|
||||||
|
DB_TRACE("lb_sendpipe") ;
|
||||||
// Drop the message if required. If we are at the end of the message
|
// Drop the message if required. If we are at the end of the message
|
||||||
// switch back to non-dropping mode.
|
// switch back to non-dropping mode.
|
||||||
if (dropping) {
|
if (dropping) {
|
||||||
@ -90,13 +111,20 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
while (active > 0) {
|
while (active > 0) {
|
||||||
if (pipes [current]->write (msg_))
|
// DAB - bias the race to provoke problems with write
|
||||||
|
msleep(zmq_lb_race_window_1_size ) ;
|
||||||
|
|
||||||
|
if (pipes [current]->write (msg_))
|
||||||
{
|
{
|
||||||
if (pipe_)
|
if (pipe_)
|
||||||
*pipe_ = pipes [current];
|
*pipe_ = pipes [current];
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (!(!more))
|
||||||
|
{
|
||||||
|
DB_TRACE("lb_assert");
|
||||||
|
fflush(stderr) ;
|
||||||
|
}
|
||||||
zmq_assert (!more);
|
zmq_assert (!more);
|
||||||
active--;
|
active--;
|
||||||
if (current < active)
|
if (current < active)
|
||||||
@ -108,12 +136,17 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
|
|||||||
// If there are no pipes we cannot send the message.
|
// If there are no pipes we cannot send the message.
|
||||||
if (active == 0) {
|
if (active == 0) {
|
||||||
errno = EAGAIN;
|
errno = EAGAIN;
|
||||||
|
DB_TRACE_EXIT("lb_sendpipe") ;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If it's final part of the message we can flush it downstream and
|
// If it's final part of the message we can flush it downstream and
|
||||||
// continue round-robining (load balance).
|
// continue round-robining (load balance).
|
||||||
more = msg_->flags () & msg_t::more? true: false;
|
more = msg_->flags () & msg_t::more? true: false;
|
||||||
|
|
||||||
|
// DAB - bias the race to provoke problems with %
|
||||||
|
msleep(zmq_lb_race_window_2_size) ;
|
||||||
|
|
||||||
if (!more) {
|
if (!more) {
|
||||||
pipes [current]->flush ();
|
pipes [current]->flush ();
|
||||||
current = (current + 1) % active;
|
current = (current + 1) % active;
|
||||||
@ -123,6 +156,7 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
|
|||||||
int rc = msg_->init ();
|
int rc = msg_->init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
DB_TRACE_EXIT("lb_sendpipe") ;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user