diff --git a/CMakeLists.txt b/CMakeLists.txt index 8ce46024..d7c6983a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -308,19 +308,19 @@ set(cxx-sources kqueue.cpp lb.cpp mailbox.cpp - mechanism.cpp + mechanism.cpp msg.cpp mtrie.cpp object.cpp options.cpp own.cpp - null_mechanism.cpp + null_mechanism.cpp pair.cpp pgm_receiver.cpp pgm_sender.cpp pgm_socket.cpp pipe.cpp - plain_mechanism.cpp + plain_mechanism.cpp poll.cpp poller_base.cpp precompiled.cpp @@ -576,6 +576,56 @@ if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Why? endforeach() endif() +enable_testing() +set(tests + test_connect_delay + test_connect_resolve + test_ctx_options + test_disconnect_inproc + test_hwm + test_invalid_rep + test_iov + test_last_endpoint + test_monitor + test_msg_flags + test_pair_inproc + test_pair_ipc + test_pair_tcp + test_probe_router + test_raw_sock + test_req_request_ids + test_req_strict + test_reqrep_device + test_reqrep_inproc + test_reqrep_ipc + test_reqrep_tcp + test_router_mandatory + test_security + test_security_curve + test_shutdown_stress + test_spec_dealer + test_spec_pushpull + test_spec_rep + test_spec_req + test_spec_router + test_stream + test_sub_forward + test_term_endpoint + test_timeo) + +foreach(test ${tests}) + add_executable(${test} tests/${test}.cpp) + target_link_libraries(${test} libzmq) + + if(RT_LIBRARY) + target_link_libraries(${test} ${RT_LIBRARY}) + endif() + if(WIN32) + add_test(NAME ${test} WORKING_DIRECTORY ${LIBRARY_OUTPUT_PATH}/Debug COMMAND ${test}) + else() + add_test(NAME ${test} COMMAND ${test}) + endif() +endforeach() #----------------------------------------------------------------------------- # installer diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index c2d77808..b5fdc723 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -682,6 +682,22 @@ Default value:: NULL Applicable socket types:: all, when using TCP transport +ZMQ_CONFLATE: Keep only last message +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If set, a socket shall keep only one message in its inbound/outbound +queue, this message being the last message received/the last message +to be sent. +Ignores 'ZMQ_RECVHWM' and 'ZMQ_SENDHWM' options. +Does not supports multi-part messages, in particular, only one part of it +is kept in the socket internal queue. +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 (false) +Applicable socket types:: ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER + + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it diff --git a/include/zmq.h b/include/zmq.h index 47f7631c..d76f49ce 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -278,6 +278,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_PROBE_ROUTER 51 #define ZMQ_REQ_REQUEST_IDS 52 #define ZMQ_REQ_STRICT 53 +#define ZMQ_CONFLATE 54 /* Message options */ #define ZMQ_MORE 1 diff --git a/include/zmq_utils.h b/include/zmq_utils.h index 10b5b18c..fc044d4e 100644 --- a/include/zmq_utils.h +++ b/include/zmq_utils.h @@ -43,6 +43,8 @@ extern "C" { # endif #endif +typedef void (zmq_thread_fn) (void*); + /* Helper functions are used by perf tests so that they don't have to care */ /* about minutiae of time-related functions on different OS platforms. */ @@ -56,6 +58,12 @@ ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_); /* Sleeps for specified number of seconds. */ ZMQ_EXPORT void zmq_sleep (int seconds_); +/* Start a thread. Returns a handle to the thread. */ +ZMQ_EXPORT void *zmq_threadstart(zmq_thread_fn* func, void* arg); + +/* Wait for thread to complete then free up resources. */ +ZMQ_EXPORT void zmq_threadclose(void* thread); + #undef ZMQ_EXPORT #ifdef __cplusplus diff --git a/src/Makefile.am b/src/Makefile.am index 62a8ba08..57bd08a4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -88,6 +88,7 @@ libzmq_la_SOURCES = \ dealer.hpp \ xsub.hpp \ ypipe.hpp \ + ypipe_flat.hpp \ yqueue.hpp \ z85_codec.hpp \ address.cpp \ @@ -163,7 +164,9 @@ libzmq_la_SOURCES = \ raw_decoder.hpp \ raw_decoder.cpp \ raw_encoder.hpp \ - raw_encoder.cpp + raw_encoder.cpp \ + ypipe_conflate.hpp \ + dbuffer.hpp if ON_MINGW libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@ diff --git a/src/blob.hpp b/src/blob.hpp index 8f4d2d82..65d305b3 100644 --- a/src/blob.hpp +++ b/src/blob.hpp @@ -21,6 +21,7 @@ #define __ZMQ_BLOB_HPP_INCLUDED__ #include +#include // Borrowed from id3lib_strings.h: // They seem to be doing something for MSC, but since I only have gcc, I'll just do that diff --git a/src/dbuffer.hpp b/src/dbuffer.hpp new file mode 100644 index 00000000..7929304d --- /dev/null +++ b/src/dbuffer.hpp @@ -0,0 +1,134 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DBUFFER_HPP_INCLUDED__ +#define __ZMQ_DBUFFER_HPP_INCLUDED__ + +#include +#include +#include + +#include "mutex.hpp" +#include "msg.hpp" + +namespace zmq +{ + + // dbuffer is a single-producer single-consumer double-buffer + // implementation. + // + // The producer writes to a back buffer and then tries to swap + // pointers between the back and front buffers. If it fails, + // due to the consumer reading from the front buffer, it just + // gives up, which is ok since writes are many and redundant. + // + // The reader simply reads from the front buffer. + // + // has_msg keeps track of whether there has been a not yet read + // value written, it is used by ypipe_conflate to mimic ypipe + // functionality regarding a reader being asleep + + template class dbuffer_t; + + template <> class dbuffer_t + { + public: + + inline dbuffer_t () + : back (&storage[0]) + , front (&storage[1]) + , has_msg (false) + { + back->init (); + front->init (); + } + + inline ~dbuffer_t() + { + back->close (); + front->close (); + } + + inline void write (const msg_t &value_) + { + msg_t& xvalue = const_cast(value_); + + zmq_assert (xvalue.check ()); + back->move (xvalue); // cannot just overwrite, might leak + + zmq_assert (back->check ()); + + if (sync.try_lock ()) + { + std::swap (back, front); + has_msg = true; + + sync.unlock (); + } + } + + inline bool read (msg_t *value_) + { + if (!value_) + return false; + + { + scoped_lock_t lock (sync); + if (!has_msg) + return false; + + zmq_assert (front->check ()); + + *value_ = *front; + front->init (); // avoid double free + + has_msg = false; + return true; + } + } + + + inline bool check_read () + { + scoped_lock_t lock (sync); + + return has_msg; + } + + inline bool probe (bool (*fn)(msg_t &)) + { + scoped_lock_t lock (sync); + return (*fn) (*front); + } + + + private: + msg_t storage[2]; + msg_t *back, *front; + + mutex_t sync; + bool has_msg; + + // Disable copying of dbuffer. + dbuffer_t (const dbuffer_t&); + const dbuffer_t &operator = (const dbuffer_t&); + }; +} + +#endif diff --git a/src/msg.cpp b/src/msg.cpp index 819927e0..f83ca056 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -58,7 +58,7 @@ int zmq::msg_t::init_size (size_t size_) u.lmsg.flags = 0; u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_); - if (!u.lmsg.content) { + if (unlikely (!u.lmsg.content)) { errno = ENOMEM; return -1; } @@ -75,19 +75,32 @@ int zmq::msg_t::init_size (size_t size_) int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_) { - u.lmsg.type = type_lmsg; - u.lmsg.flags = 0; - u.lmsg.content = (content_t*) malloc (sizeof (content_t)); - if (!u.lmsg.content) { - errno = ENOMEM; - return -1; + // If data is NULL and size is not 0, a segfault + // would occur once the data is accessed + assert (data_ != NULL || size_ == 0); + + // Initialize constant message if there's no need to deallocate + if(ffn_ == NULL) { + u.cmsg.type = type_cmsg; + u.cmsg.flags = 0; + u.cmsg.data = data_; + u.cmsg.size = size_; } + else { + u.lmsg.type = type_lmsg; + u.lmsg.flags = 0; + u.lmsg.content = (content_t*) malloc (sizeof (content_t)); + if (!u.lmsg.content) { + errno = ENOMEM; + return -1; + } - u.lmsg.content->data = data_; - u.lmsg.content->size = size_; - u.lmsg.content->ffn = ffn_; - u.lmsg.content->hint = hint_; - new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); + u.lmsg.content->data = data_; + u.lmsg.content->size = size_; + u.lmsg.content->ffn = ffn_; + u.lmsg.content->hint = hint_; + new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); + } return 0; } @@ -193,6 +206,8 @@ void *zmq::msg_t::data () return u.vsm.data; case type_lmsg: return u.lmsg.content->data; + case type_cmsg: + return u.cmsg.data; default: zmq_assert (false); return NULL; @@ -209,6 +224,8 @@ size_t zmq::msg_t::size () return u.vsm.size; case type_lmsg: return u.lmsg.content->size; + case type_cmsg: + return u.cmsg.size; default: zmq_assert (false); return 0; @@ -245,6 +262,11 @@ bool zmq::msg_t::is_vsm () return u.base.type == type_vsm; } +bool zmq::msg_t::is_cmsg () +{ + return u.base.type == type_cmsg; +} + void zmq::msg_t::add_refs (int refs_) { zmq_assert (refs_ >= 0); @@ -253,8 +275,8 @@ void zmq::msg_t::add_refs (int refs_) if (!refs_) return; - // VSMs and delimiters can be copied straight away. The only message type - // that needs special care are long messages. + // VSMs, CMSGS and delimiters can be copied straight away. The only + // message type that needs special care are long messages. if (u.base.type == type_lmsg) { if (u.lmsg.flags & msg_t::shared) u.lmsg.content->refcnt.add (refs_); diff --git a/src/msg.hpp b/src/msg.hpp index e3bab85c..bac1f09a 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -44,7 +44,7 @@ namespace zmq { public: - // Mesage flags. + // Message flags. enum { more = 1, @@ -69,6 +69,7 @@ namespace zmq bool is_identity () const; bool is_delimiter (); bool is_vsm (); + bool is_cmsg (); // After calling this function you can copy the message in POD-style // refs_ times. No need to call copy. @@ -104,10 +105,15 @@ namespace zmq enum type_t { type_min = 101, + // VSM messages store the content in the message itself type_vsm = 101, + // LMSG messages store the content in malloc-ed memory type_lmsg = 102, + // Delimiter messages are used in envelopes type_delimiter = 103, - type_max = 103 + // CMSG messages point to constant data + type_cmsg = 104, + type_max = 104 }; // Note that fields shared between different message types are not @@ -132,6 +138,14 @@ namespace zmq unsigned char type; unsigned char flags; } lmsg; + struct { + void* data; + size_t size; + unsigned char unused + [max_vsm_size + 1 - sizeof (void*) - sizeof (size_t)]; + unsigned char type; + unsigned char flags; + } cmsg; struct { unsigned char unused [max_vsm_size + 1]; unsigned char type; diff --git a/src/mutex.hpp b/src/mutex.hpp index 354df78b..308f5eff 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -50,6 +50,11 @@ namespace zmq EnterCriticalSection (&cs); } + inline bool try_lock () + { + return (bool) TryEnterCriticalSection (&cs); + } + inline void unlock () { LeaveCriticalSection (&cs); @@ -94,6 +99,16 @@ namespace zmq posix_assert (rc); } + inline bool try_lock () + { + int rc = pthread_mutex_trylock (&mutex); + if (rc == EBUSY) + return false; + + posix_assert (rc); + return true; + } + inline void unlock () { int rc = pthread_mutex_unlock (&mutex); @@ -113,4 +128,30 @@ namespace zmq #endif + +namespace zmq +{ + struct scoped_lock_t + { + scoped_lock_t (mutex_t& mutex_) + : mutex (mutex_) + { + mutex.lock (); + } + + ~scoped_lock_t () + { + mutex.unlock (); + } + + private: + + mutex_t& mutex; + + // Disable copy construction and assignment. + scoped_lock_t (const scoped_lock_t&); + const scoped_lock_t &operator = (const scoped_lock_t&); + }; +} + #endif diff --git a/src/options.cpp b/src/options.cpp index 007fceb8..17e04659 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -52,7 +52,8 @@ zmq::options_t::options_t () : tcp_keepalive_intvl (-1), mechanism (ZMQ_NULL), as_server (0), - socket_id (0) + socket_id (0), + conflate (false) { } @@ -338,6 +339,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, } break; # endif + + case ZMQ_CONFLATE: + if (is_int && (value == 0 || value == 1)) { + conflate = (value != 0); + return 0; + } + break; + + default: + break; } errno = EINVAL; return -1; @@ -594,6 +605,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) } break; # endif + + case ZMQ_CONFLATE: + if (is_int) { + *value = conflate; + return 0; + } + break; + } errno = EINVAL; return -1; diff --git a/src/options.hpp b/src/options.hpp index f665b9fa..4eea0368 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -134,6 +134,12 @@ namespace zmq // ID of the socket. int socket_id; + + // If true, socket conflates outgoing/incoming messages. + // Applicable to dealer, push/pull, pub/sub socket types. + // Cannot receive multi-part messages. + // Ignores hwm + bool conflate; }; } diff --git a/src/pipe.cpp b/src/pipe.cpp index 6c3e0b9e..98ed66b6 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -23,22 +23,37 @@ #include "pipe.hpp" #include "err.hpp" +#include "ypipe.hpp" +#include "ypipe_conflate.hpp" + int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2]) + int hwms_ [2], bool conflate_ [2]) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. - pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t (); + typedef ypipe_t upipe_normal_t; + typedef ypipe_conflate_t upipe_conflate_t; + + pipe_t::upipe_t *upipe1; + if(conflate_ [0]) + upipe1 = new (std::nothrow) upipe_conflate_t (); + else + upipe1 = new (std::nothrow) upipe_normal_t (); alloc_assert (upipe1); - pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t (); + + pipe_t::upipe_t *upipe2; + if(conflate_ [1]) + upipe2 = new (std::nothrow) upipe_conflate_t (); + else + upipe2 = new (std::nothrow) upipe_normal_t (); alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0]); + hwms_ [1], hwms_, conflate_ [0]); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1]); + hwms_ [0], hwms_ [1], conflate_ [1]); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); @@ -48,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], } zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_) : + int inhwm_, int outhwm_, bool conflate_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), @@ -62,7 +77,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (true) + delay (true), + conflate (conflate_) { } @@ -303,11 +319,15 @@ void zmq::pipe_t::process_pipe_term_ack () // First, delete all the unread messages in the pipe. We have to do it by // hand because msg_t doesn't have automatic destructor. Then deallocate // the ypipe itself. - msg_t msg; - while (inpipe->read (&msg)) { - int rc = msg.close (); - errno_assert (rc == 0); + + if (!conflate) { + msg_t msg; + while (inpipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } } + delete inpipe; // Deallocate the pipe object @@ -444,7 +464,13 @@ void zmq::pipe_t::hiccup () inpipe = NULL; // Create new inpipe. - inpipe = new (std::nothrow) pipe_t::upipe_t (); + if (conflate) + inpipe = new (std::nothrow) + ypipe_t (); + else + inpipe = new (std::nothrow) + ypipe_conflate_t (); + alloc_assert (inpipe); in_active = true; diff --git a/src/pipe.hpp b/src/pipe.hpp index 867d1b97..8ee4e59c 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -21,7 +21,7 @@ #define __ZMQ_PIPE_HPP_INCLUDED__ #include "msg.hpp" -#include "ypipe.hpp" +#include "ypipe_base.hpp" #include "config.hpp" #include "object.hpp" #include "stdint.hpp" @@ -40,8 +40,10 @@ namespace zmq // Delay specifies how the pipe behaves when the peer terminates. If true // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. + // If conflate is true, only the most recently arrived message could be + // read (older messages are discarded) int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], - int hwms_ [2]); + int hwms_ [2], bool conflate_ [2]); struct i_pipe_events { @@ -64,9 +66,9 @@ namespace zmq public array_item_t <3> { // This allows pipepair to create pipe objects. - friend int pipepair (zmq::object_t *parents_ [2], - zmq::pipe_t* pipes_ [2], int hwms_ [2]); - + friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], + int hwms_ [2], bool conflate_ [2]); + public: // Specifies the object to send events to. @@ -113,7 +115,7 @@ namespace zmq private: // Type of the underlying lock-free pipe. - typedef ypipe_t upipe_t; + typedef ypipe_base_t upipe_t; // Command handlers. void process_activate_read (); @@ -128,7 +130,7 @@ namespace zmq // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_); + int inhwm_, int outhwm_, bool conflate_); // Pipepair uses this function to let us know about // the peer pipe object. @@ -199,6 +201,8 @@ namespace zmq // Computes appropriate low watermark from the given high watermark. static int compute_lwm (int hwm_); + bool conflate; + // Disable copying. pipe_t (const pipe_t&); const pipe_t &operator = (const pipe_t&); diff --git a/src/session_base.cpp b/src/session_base.cpp index af70b57d..efb04379 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -299,7 +299,8 @@ int zmq::session_base_t::zap_connect () object_t *parents [2] = {this, peer.socket}; pipe_t *new_pipes [2] = {NULL, NULL}; int hwms [2] = {0, 0}; - int rc = pipepair (parents, new_pipes, hwms); + bool conflates [2] = {false, false}; + int rc = pipepair (parents, new_pipes, hwms, conflates); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -332,8 +333,18 @@ void zmq::session_base_t::process_attach (i_engine *engine_) if (!pipe && !is_terminating ()) { object_t *parents [2] = {this, socket}; pipe_t *pipes [2] = {NULL, NULL}; - int hwms [2] = {options.rcvhwm, options.sndhwm}; - int rc = pipepair (parents, pipes, hwms); + + bool conflate = options.conflate && + (options.type == ZMQ_DEALER || + options.type == ZMQ_PULL || + options.type == ZMQ_PUSH || + options.type == ZMQ_PUB || + options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : options.rcvhwm, + conflate? -1 : options.sndhwm}; + bool conflates [2] = {conflate, conflate}; + int rc = pipepair (parents, pipes, hwms, conflates); errno_assert (rc == 0); // Plug the local end of the pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b8c11e19..731be10f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -450,8 +450,17 @@ int zmq::socket_base_t::connect (const char *addr_) // Create a bi-directional pipe to connect the peers. object_t *parents [2] = {this, peer.socket}; pipe_t *new_pipes [2] = {NULL, NULL}; - int hwms [2] = {sndhwm, rcvhwm}; - int rc = pipepair (parents, new_pipes, hwms); + + bool conflate = options.conflate && + (options.type == ZMQ_DEALER || + options.type == ZMQ_PULL || + options.type == ZMQ_PUSH || + options.type == ZMQ_PUB || + options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; + bool conflates [2] = {conflate, conflate}; + int rc = pipepair (parents, new_pipes, hwms, conflates); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -553,8 +562,18 @@ int zmq::socket_base_t::connect (const char *addr_) // Create a bi-directional pipe. object_t *parents [2] = {this, session}; pipe_t *new_pipes [2] = {NULL, NULL}; - int hwms [2] = {options.sndhwm, options.rcvhwm}; - rc = pipepair (parents, new_pipes, hwms); + + bool conflate = options.conflate && + (options.type == ZMQ_DEALER || + options.type == ZMQ_PULL || + options.type == ZMQ_PUSH || + options.type == ZMQ_PUB || + options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : options.sndhwm, + conflate? -1 : options.rcvhwm}; + bool conflates [2] = {conflate, conflate}; + rc = pipepair (parents, new_pipes, hwms, conflates); errno_assert (rc == 0); // Attach local end of the pipe to the socket object. diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 182df4e7..86e5d01c 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -23,6 +23,7 @@ #include "atomic_ptr.hpp" #include "yqueue.hpp" #include "platform.hpp" +#include "ypipe_base.hpp" namespace zmq { @@ -34,7 +35,7 @@ namespace zmq // N is granularity of the pipe, i.e. how many items are needed to // perform next memory allocation. - template class ypipe_t + template class ypipe_t : public ypipe_base_t { public: diff --git a/src/ypipe_base.hpp b/src/ypipe_base.hpp new file mode 100644 index 00000000..b7e7081b --- /dev/null +++ b/src/ypipe_base.hpp @@ -0,0 +1,44 @@ + +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_YPIPE_BASE_HPP_INCLUDED__ +#define __ZMQ_YPIPE_BASE_HPP_INCLUDED__ + + +namespace zmq +{ + // ypipe_base abstracts ypipe and ypipe_conflate specific + // classes, one is selected according to a the conflate + // socket option + + template class ypipe_base_t + { + public: + virtual ~ypipe_base_t () {} + virtual void write (const T &value_, bool incomplete_) = 0; + virtual bool unwrite (T *value_) = 0; + virtual bool flush () = 0; + virtual bool check_read () = 0; + virtual bool read (T *value_) = 0; + virtual bool probe (bool (*fn)(T &)) = 0; + }; +} + +#endif diff --git a/src/ypipe_conflate.hpp b/src/ypipe_conflate.hpp new file mode 100644 index 00000000..6dc20ef9 --- /dev/null +++ b/src/ypipe_conflate.hpp @@ -0,0 +1,127 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__ +#define __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__ + +#include "platform.hpp" +#include "dbuffer.hpp" +#include "ypipe_base.hpp" + +namespace zmq +{ + + // Adapter for dbuffer, to plug it in instead of a queue for the sake + // of implementing the conflate socket option, which, if set, makes + // the receiving side to discard all incoming messages but the last one. + // + // reader_awake flag is needed here to mimic ypipe delicate behaviour + // around the reader being asleep (see 'c' pointer being NULL in ypipe.hpp) + + template class ypipe_conflate_t : public ypipe_base_t + { + public: + + // Initialises the pipe. + inline ypipe_conflate_t () + : reader_awake(false) + { + } + + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~ypipe_conflate_t () + { + } + + // Following function (write) deliberately copies uninitialised data + // when used with zmq_msg. Initialising the VSM body for + // non-VSM messages won't be good for performance. + +#ifdef ZMQ_HAVE_OPENVMS +#pragma message save +#pragma message disable(UNINIT) +#endif + inline void write (const T &value_, bool incomplete_) + { + (void)incomplete_; + + dbuffer.write (value_); + } + +#ifdef ZMQ_HAVE_OPENVMS +#pragma message restore +#endif + + // There are no incomplete items for conflate ypipe + inline bool unwrite (T *value_) + { + return false; + } + + // Flush is no-op for conflate ypipe. Reader asleep behaviour + // is as of the usual ypipe. + // Returns false if the reader thread is sleeping. In that case, + // caller is obliged to wake the reader up before using the pipe again. + inline bool flush () + { + return reader_awake; + } + + // Check whether item is available for reading. + inline bool check_read () + { + bool res = dbuffer.check_read (); + if (!res) + reader_awake = false; + + return res; + } + + // Reads an item from the pipe. Returns false if there is no value. + // available. + inline bool read (T *value_) + { + if (!check_read ()) + return false; + + return dbuffer.read (value_); + } + + // Applies the function fn to the first elemenent in the pipe + // and returns the value returned by the fn. + // The pipe mustn't be empty or the function crashes. + inline bool probe (bool (*fn)(T &)) + { + return dbuffer.probe (fn); + } + + protected: + + dbuffer_t dbuffer; + bool reader_awake; + + // Disable copying of ypipe object. + ypipe_conflate_t (const ypipe_conflate_t&); + const ypipe_conflate_t &operator = (const ypipe_conflate_t&); + }; + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index def0ba91..ff3e60fa 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -539,7 +539,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) a_[i].iov_len = zmq_msg_size (&msg); a_[i].iov_base = malloc(a_[i].iov_len); - if (!a_[i].iov_base) { + if (unlikely (!a_[i].iov_base)) { errno = ENOMEM; return -1; } diff --git a/src/zmq_utils.cpp b/src/zmq_utils.cpp index 53638756..4b7e64ee 100644 --- a/src/zmq_utils.cpp +++ b/src/zmq_utils.cpp @@ -26,6 +26,7 @@ #include "stdint.hpp" #include "clock.hpp" #include "err.hpp" +#include "thread.hpp" #if !defined ZMQ_HAVE_WINDOWS #include @@ -57,3 +58,17 @@ unsigned long zmq_stopwatch_stop (void *watch_) free (watch_); return (unsigned long) (end - start); } + +void *zmq_threadstart(zmq_thread_fn* func, void* arg) +{ + zmq::thread_t* thread = new zmq::thread_t; + thread->start(func, arg); + return thread; +} + +void zmq_threadclose(void* thread) +{ + zmq::thread_t* pThread = static_cast(thread); + pThread->stop(); + delete pThread; +} diff --git a/tests/Makefile.am b/tests/Makefile.am index 6464428f..5bc8d12f 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -31,7 +31,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_spec_router \ test_spec_pushpull \ test_req_request_ids \ - test_req_strict + test_req_strict \ + test_conflate if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -69,6 +70,7 @@ test_spec_router_SOURCES = test_spec_router.cpp test_spec_pushpull_SOURCES = test_spec_pushpull.cpp test_req_request_ids_SOURCES = test_req_request_ids.cpp test_req_strict_SOURCES = test_req_strict.cpp +test_conflate_SOURCES = test_conflate.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_conflate.cpp b/tests/test_conflate.cpp new file mode 100644 index 00000000..bbc7646d --- /dev/null +++ b/tests/test_conflate.cpp @@ -0,0 +1,77 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include "../include/zmq_utils.h" +#include "testutil.hpp" + +int main (int argc, char *argv []) +{ + const char *bind_to = "tcp://127.0.0.1:77008"; + + int rc; + + void* ctx = zmq_init (1); + assert (ctx); + + void* s_in = zmq_socket (ctx, ZMQ_PULL); + assert (s_in); + + int conflate = 1; + rc = zmq_setsockopt (s_in, ZMQ_CONFLATE, &conflate, sizeof(conflate)); + assert (rc == 0); + + rc = zmq_bind (s_in, bind_to); + assert (rc == 0); + + void* s_out = zmq_socket (ctx, ZMQ_PUSH); + assert (s_out); + + rc = zmq_connect (s_out, bind_to); + assert (rc == 0); + + int message_count = 20; + + for (int j = 0; j < message_count; ++j) { + rc = zmq_send(s_out, (void*)&j, sizeof(int), 0); + if (rc < 0) { + printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); + return -1; + } + } + + zmq_sleep (1); + + int payload_recved = 0; + rc = zmq_recv(s_in, (void*)&payload_recved, sizeof(int), 0); + assert (rc > 0); + assert (payload_recved == message_count - 1); + + + rc = zmq_close (s_in); + assert (rc == 0); + + rc = zmq_close (s_out); + assert (rc == 0); + + rc = zmq_term (ctx); + assert (rc == 0); + + return 0; +} diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index b85aced1..77ed5fd1 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -18,17 +18,16 @@ */ #include "../include/zmq.h" +#include "../include/zmq_utils.h" #include #include #include -#include #include - -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); int val; int rc; char buffer[16]; @@ -198,11 +197,10 @@ int main (void) rc = zmq_close (backend); assert (rc == 0); - + // Give time to process disconnect // There's no way to do this except with a sleep - struct timespec t = { 0, 250 * 1000000 }; - nanosleep (&t, NULL); + zmq_sleep(1); // Send a message, should fail rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); diff --git a/tests/test_connect_resolve.cpp b/tests/test_connect_resolve.cpp index dd4e58f1..99fb4adb 100644 --- a/tests/test_connect_resolve.cpp +++ b/tests/test_connect_resolve.cpp @@ -20,12 +20,11 @@ #include "../include/zmq.h" #include #include - -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_ctx_options.cpp b/tests/test_ctx_options.cpp index 7d71ea14..2fde234c 100644 --- a/tests/test_ctx_options.cpp +++ b/tests/test_ctx_options.cpp @@ -19,12 +19,11 @@ #include "../include/zmq.h" #include -#include -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); int rc; // Set up our context and sockets diff --git a/tests/test_disconnect_inproc.cpp b/tests/test_disconnect_inproc.cpp index bd22ed49..fe2a72b0 100644 --- a/tests/test_disconnect_inproc.cpp +++ b/tests/test_disconnect_inproc.cpp @@ -19,7 +19,7 @@ #include #include -#include +#include "testutil.hpp" /// Initialize a zeromq message with a given null-terminated string #define ZMQ_PREPARE_STRING(msg, data, size) \ @@ -31,6 +31,7 @@ int publicationsReceived = 0; bool isSubscribed = false; int main(int argc, char** argv) { + setup_test_environment(); void* context = zmq_ctx_new(); void* pubSocket; void* subSocket; diff --git a/tests/test_hwm.cpp b/tests/test_hwm.cpp index 0c85d3f1..7f26176a 100644 --- a/tests/test_hwm.cpp +++ b/tests/test_hwm.cpp @@ -20,11 +20,11 @@ #include "../include/zmq.h" #include #include -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_invalid_rep.cpp b/tests/test_invalid_rep.cpp index 91061e09..54a32e81 100644 --- a/tests/test_invalid_rep.cpp +++ b/tests/test_invalid_rep.cpp @@ -19,12 +19,11 @@ #include "../include/zmq.h" #include - -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); // Create REQ/ROUTER wiring. void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_iov.cpp b/tests/test_iov.cpp index b5ac817b..6d647123 100644 --- a/tests/test_iov.cpp +++ b/tests/test_iov.cpp @@ -18,12 +18,11 @@ */ #include "../include/zmq.h" +#include "../include/zmq_utils.h" #include -#include #include #include -#undef NDEBUG -#include +#include "testutil.hpp" // XSI vector I/O #if defined ZMQ_HAVE_UIO @@ -37,6 +36,7 @@ struct iovec { void do_check(void* sb, void* sc, unsigned int msgsz) { + setup_test_environment(); int rc; int sum =0; for (int i = 0; i < 10; i++) @@ -85,7 +85,7 @@ int main (void) rc = zmq_bind (sb, "inproc://a"); assert (rc == 0); - ::sleep(1); + zmq_sleep(1); void *sc = zmq_socket (ctx, ZMQ_PUSH); rc = zmq_connect (sc, "inproc://a"); @@ -107,5 +107,5 @@ int main (void) rc = zmq_ctx_term (ctx); assert (rc == 0); - return 0; + return 0; } diff --git a/tests/test_last_endpoint.cpp b/tests/test_last_endpoint.cpp index c364dc81..dc836869 100644 --- a/tests/test_last_endpoint.cpp +++ b/tests/test_last_endpoint.cpp @@ -19,9 +19,7 @@ #include "../include/zmq.h" #include - -#undef NDEBUG -#include +#include "testutil.hpp" static void do_bind_and_verify (void *s, const char *endpoint) { @@ -35,6 +33,7 @@ static void do_bind_and_verify (void *s, const char *endpoint) int main (void) { + setup_test_environment(); // Create the infrastructure void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 499c6a99..26daf495 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -19,9 +19,8 @@ #include #include "../include/zmq.h" -#include +#include "../include/zmq_utils.h" #include -#include #include "testutil.hpp" // REQ socket events handled @@ -67,7 +66,7 @@ static bool read_msg(void* s, zmq_event_t& event, std::string& ep) // REQ socket monitor thread -static void *req_socket_monitor (void *ctx) +static void req_socket_monitor (void *ctx) { zmq_event_t event; std::string ep ; @@ -105,11 +104,10 @@ static void *req_socket_monitor (void *ctx) } } zmq_close (s); - return NULL; } // 2nd REQ socket monitor thread -static void *req2_socket_monitor (void *ctx) +static void req2_socket_monitor (void *ctx) { zmq_event_t event; std::string ep ; @@ -134,11 +132,10 @@ static void *req2_socket_monitor (void *ctx) } } zmq_close (s); - return NULL; } // REP socket monitor thread -static void *rep_socket_monitor (void *ctx) +static void rep_socket_monitor (void *ctx) { zmq_event_t event; std::string ep ; @@ -175,16 +172,16 @@ static void *rep_socket_monitor (void *ctx) } } zmq_close (s); - return NULL; } int main (void) { + setup_test_environment(); int rc; void *req; void *req2; void *rep; - pthread_t threads [3]; + void* threads [3]; addr = "tcp://127.0.0.1:5560"; @@ -208,8 +205,7 @@ int main (void) // REP socket monitor, all events rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); assert (rc == 0); - rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx); - assert (rc == 0); + threads [0] = zmq_threadstart(&rep_socket_monitor, ctx); // REQ socket req = zmq_socket (ctx, ZMQ_REQ); @@ -218,9 +214,8 @@ int main (void) // REQ socket monitor, all events rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); assert (rc == 0); - rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx); - assert (rc == 0); - sleep (1); + threads [1] = zmq_threadstart(&req_socket_monitor, ctx); + zmq_sleep(1); // Bind REQ and REP rc = zmq_bind (rep, addr.c_str()); @@ -238,8 +233,7 @@ int main (void) // 2nd REQ socket monitor, connected event only rc = zmq_socket_monitor (req2, "inproc://monitor.req2", ZMQ_EVENT_CONNECTED); assert (rc == 0); - rc = pthread_create (&threads [2], NULL, req2_socket_monitor, ctx); - assert (rc == 0); + threads [2] = zmq_threadstart(&req2_socket_monitor, ctx); rc = zmq_connect (req2, addr.c_str()); assert (rc == 0); @@ -249,8 +243,7 @@ int main (void) assert (rc == 0); // Allow some time for detecting error states - struct timespec t = { 0, 250 * 1000000 }; - nanosleep (&t, NULL); + zmq_sleep(1); // Close the REQ socket rc = zmq_close (req); @@ -276,7 +269,8 @@ int main (void) assert (req2_socket_events & ZMQ_EVENT_CONNECTED); assert (!(req2_socket_events & ZMQ_EVENT_CLOSED)); - pthread_exit (NULL); + for (unsigned int i = 0; i < 3; ++i) + zmq_threadclose(threads [i]); return 0 ; } diff --git a/tests/test_msg_flags.cpp b/tests/test_msg_flags.cpp index 8d40cb71..172724e5 100644 --- a/tests/test_msg_flags.cpp +++ b/tests/test_msg_flags.cpp @@ -19,12 +19,11 @@ #include "../include/zmq.h" #include - -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); // Create the infrastructure void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_pair_inproc.cpp b/tests/test_pair_inproc.cpp index 5fd1ec85..133df827 100644 --- a/tests/test_pair_inproc.cpp +++ b/tests/test_pair_inproc.cpp @@ -22,6 +22,7 @@ int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_pair_ipc.cpp b/tests/test_pair_ipc.cpp index e4d38805..6ee23953 100644 --- a/tests/test_pair_ipc.cpp +++ b/tests/test_pair_ipc.cpp @@ -22,6 +22,7 @@ int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_pair_tcp.cpp b/tests/test_pair_tcp.cpp index 6d204c57..ca79e503 100644 --- a/tests/test_pair_tcp.cpp +++ b/tests/test_pair_tcp.cpp @@ -22,6 +22,7 @@ int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_probe_router.cpp b/tests/test_probe_router.cpp index 0c2f553e..6a2ae8c9 100644 --- a/tests/test_probe_router.cpp +++ b/tests/test_probe_router.cpp @@ -20,11 +20,11 @@ #include "../include/zmq.h" #include #include -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_raw_sock.cpp b/tests/test_raw_sock.cpp index f18d3b4a..05d1c891 100644 --- a/tests/test_raw_sock.cpp +++ b/tests/test_raw_sock.cpp @@ -19,9 +19,7 @@ #include "../include/zmq.h" #include -#include -#undef NDEBUG -#include +#include "testutil.hpp" // ZMTP protocol greeting structure @@ -45,6 +43,7 @@ static zmtp_greeting_t greeting int main (void) { + setup_test_environment(); int rc; // Set up our context and sockets diff --git a/tests/test_req_request_ids.cpp b/tests/test_req_request_ids.cpp index 92658573..6f7f9490 100644 --- a/tests/test_req_request_ids.cpp +++ b/tests/test_req_request_ids.cpp @@ -18,12 +18,12 @@ */ #include -#include #include #include "testutil.hpp" int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_req_strict.cpp b/tests/test_req_strict.cpp index 9ece8a6c..fe3969f2 100644 --- a/tests/test_req_strict.cpp +++ b/tests/test_req_strict.cpp @@ -17,13 +17,14 @@ along with this program. If not, see . */ +#include "../include/zmq_utils.h" #include -#include #include #include "testutil.hpp" int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); @@ -57,9 +58,7 @@ int main (void) // We have to give the connects time to finish otherwise the requests // will not properly round-robin. We could alternatively connect the // REQ sockets to the REP sockets. - struct timespec t = { 0, 250 * 1000000 }; - nanosleep (&t, NULL); - + zmq_sleep(1); // Case 1: Second send() before a reply arrives in a pipe. diff --git a/tests/test_reqrep_device.cpp b/tests/test_reqrep_device.cpp index c7d0a442..d5945668 100644 --- a/tests/test_reqrep_device.cpp +++ b/tests/test_reqrep_device.cpp @@ -20,12 +20,11 @@ #include "../include/zmq.h" #include #include - -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_reqrep_inproc.cpp b/tests/test_reqrep_inproc.cpp index 0c9f50c1..2172df12 100644 --- a/tests/test_reqrep_inproc.cpp +++ b/tests/test_reqrep_inproc.cpp @@ -22,6 +22,7 @@ int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_reqrep_ipc.cpp b/tests/test_reqrep_ipc.cpp index 9af3972c..c07117ae 100644 --- a/tests/test_reqrep_ipc.cpp +++ b/tests/test_reqrep_ipc.cpp @@ -22,6 +22,7 @@ int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_reqrep_tcp.cpp b/tests/test_reqrep_tcp.cpp index 2640af30..8097b2ce 100644 --- a/tests/test_reqrep_tcp.cpp +++ b/tests/test_reqrep_tcp.cpp @@ -22,6 +22,7 @@ int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_router_mandatory.cpp b/tests/test_router_mandatory.cpp index 8b6e38ec..512e3ed9 100644 --- a/tests/test_router_mandatory.cpp +++ b/tests/test_router_mandatory.cpp @@ -20,11 +20,11 @@ #include "../include/zmq.h" #include #include -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); void *router = zmq_socket (ctx, ZMQ_ROUTER); diff --git a/tests/test_security.cpp b/tests/test_security.cpp index 225d857e..e6feed34 100644 --- a/tests/test_security.cpp +++ b/tests/test_security.cpp @@ -17,13 +17,12 @@ along with this program. If not, see . */ -#include +#include "../include/zmq_utils.h" #include #include #include "testutil.hpp" -static void * -zap_handler (void *zap) +static void zap_handler (void *zap) { char *version = s_recv (zap); char *sequence = s_recv (zap); @@ -62,12 +61,11 @@ zap_handler (void *zap) int rc = zmq_close (zap); assert (rc == 0); - - return NULL; } int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); @@ -177,9 +175,7 @@ int main (void) assert (rc == 0); // Spawn ZAP handler - pthread_t zap_thread; - rc = pthread_create (&zap_thread, NULL, &zap_handler, zap); - assert (rc == 0); + void* zap_thread = zmq_threadstart(&zap_handler, zap); rc = zmq_bind (server, "tcp://*:9998"); assert (rc == 0); @@ -194,7 +190,7 @@ int main (void) assert (rc == 0); // Wait until ZAP handler terminates. - pthread_join (zap_thread, NULL); + zmq_threadclose(zap_thread); // Check PLAIN security -- two servers trying to talk to each other server = zmq_socket (ctx, ZMQ_DEALER); diff --git a/tests/test_security_curve.cpp b/tests/test_security_curve.cpp index 94ab9f93..19304c12 100644 --- a/tests/test_security_curve.cpp +++ b/tests/test_security_curve.cpp @@ -18,13 +18,12 @@ */ #include "platform.hpp" -#include +#include "../include/zmq_utils.h" #include #include #include "testutil.hpp" -static void * -zap_handler (void *zap) +static void zap_handler (void *zap) { char *version = s_recv (zap); char *sequence = s_recv (zap); @@ -52,8 +51,6 @@ zap_handler (void *zap) int rc = zmq_close (zap); assert (rc == 0); - - return NULL; } int main (void) @@ -62,6 +59,7 @@ int main (void) printf ("libsodium not installed, skipping CURVE test\n"); return 0; #endif + setup_test_environment(); int rc; size_t optsize; int mechanism; @@ -122,9 +120,7 @@ int main (void) assert (rc == 0); // Spawn ZAP handler - pthread_t zap_thread; - rc = pthread_create (&zap_thread, NULL, &zap_handler, zap); - assert (rc == 0); + void* zap_thread = zmq_threadstart(&zap_handler, zap); rc = zmq_bind (server, "tcp://*:9998"); assert (rc == 0); @@ -139,7 +135,7 @@ int main (void) assert (rc == 0); // Wait until ZAP handler terminates. - pthread_join (zap_thread, NULL); + zmq_threadclose(zap_thread); // Shutdown rc = zmq_ctx_term (ctx); diff --git a/tests/test_shutdown_stress.cpp b/tests/test_shutdown_stress.cpp index 8e8a3099..4328ca85 100644 --- a/tests/test_shutdown_stress.cpp +++ b/tests/test_shutdown_stress.cpp @@ -18,18 +18,16 @@ */ #include "../include/zmq.h" -#include +#include "../include/zmq_utils.h" #include #include - -#undef NDEBUG -#include +#include "testutil.hpp" #define THREAD_COUNT 100 extern "C" { - static void *worker (void *s) + static void worker (void *s) { int rc; @@ -39,19 +37,18 @@ extern "C" // Start closing the socket while the connecting process is underway. rc = zmq_close (s); assert (rc == 0); - - return NULL; } } int main (void) { + setup_test_environment(); void *s1; void *s2; int i; int j; int rc; - pthread_t threads [THREAD_COUNT]; + void* threads [THREAD_COUNT]; for (j = 0; j != 10; j++) { @@ -69,13 +66,11 @@ int main (void) for (i = 0; i != THREAD_COUNT; i++) { s2 = zmq_socket (ctx, ZMQ_SUB); assert (s2); - rc = pthread_create (&threads [i], NULL, worker, s2); - assert (rc == 0); + threads [i] = zmq_threadstart(&worker, s2); } for (i = 0; i != THREAD_COUNT; i++) { - rc = pthread_join (threads [i], NULL); - assert (rc == 0); + zmq_threadclose(threads [i]); } rc = zmq_close (s1); diff --git a/tests/test_spec_dealer.cpp b/tests/test_spec_dealer.cpp index 50359399..fe442d24 100644 --- a/tests/test_spec_dealer.cpp +++ b/tests/test_spec_dealer.cpp @@ -220,6 +220,7 @@ void test_block_on_send_no_peers (void *ctx) int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_spec_pushpull.cpp b/tests/test_spec_pushpull.cpp index 5af091ea..f9d50102 100644 --- a/tests/test_spec_pushpull.cpp +++ b/tests/test_spec_pushpull.cpp @@ -258,6 +258,7 @@ void test_destroy_queue_on_disconnect (void *ctx) int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_spec_rep.cpp b/tests/test_spec_rep.cpp index 3459b743..2aadc172 100644 --- a/tests/test_spec_rep.cpp +++ b/tests/test_spec_rep.cpp @@ -123,6 +123,7 @@ void test_envelope (void *ctx) int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_spec_req.cpp b/tests/test_spec_req.cpp index 1c0384ed..416c419d 100644 --- a/tests/test_spec_req.cpp +++ b/tests/test_spec_req.cpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ +#include "../include/zmq_utils.h" #include -#include #include #include "testutil.hpp" @@ -49,8 +49,7 @@ void test_round_robin_out (void *ctx) // We have to give the connects time to finish otherwise the requests // will not properly round-robin. We could alternatively connect the // REQ sockets to the REP sockets. - struct timespec t = { 0, 250 * 1000000 }; - nanosleep (&t, NULL); + zmq_sleep(1); // Send our peer-replies, and expect every REP it used once in order for (size_t peer = 0; peer < services; peer++) { @@ -217,6 +216,7 @@ void test_block_on_send_no_peers (void *ctx) int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_spec_router.cpp b/tests/test_spec_router.cpp index 8083e29d..f733285c 100644 --- a/tests/test_spec_router.cpp +++ b/tests/test_spec_router.cpp @@ -177,6 +177,7 @@ void test_destroy_queue_on_disconnect (void *ctx) int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); diff --git a/tests/test_stream.cpp b/tests/test_stream.cpp index 58812b62..93f1e7db 100644 --- a/tests/test_stream.cpp +++ b/tests/test_stream.cpp @@ -19,9 +19,7 @@ #include "../include/zmq.h" #include -#include -#undef NDEBUG -#include +#include "testutil.hpp" // ZMTP protocol greeting structure @@ -222,6 +220,7 @@ test_stream_to_stream (void) int main (void) { + setup_test_environment(); test_stream_to_dealer (); test_stream_to_stream (); } diff --git a/tests/test_sub_forward.cpp b/tests/test_sub_forward.cpp index 8caa874e..acf91314 100644 --- a/tests/test_sub_forward.cpp +++ b/tests/test_sub_forward.cpp @@ -18,13 +18,14 @@ */ #include "../include/zmq.h" +#include "../include/zmq_utils.h" #include #include -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); @@ -62,8 +63,7 @@ int main (void) assert (rc >= 0); // Wait a bit till the subscription gets to the publisher - struct timespec t = { 0, 250 * 1000000 }; - nanosleep (&t, NULL); + zmq_sleep(1); // Send an empty message rc = zmq_send (pub, NULL, 0, 0); diff --git a/tests/test_term_endpoint.cpp b/tests/test_term_endpoint.cpp index d5743c57..ad43df51 100644 --- a/tests/test_term_endpoint.cpp +++ b/tests/test_term_endpoint.cpp @@ -18,15 +18,14 @@ */ #include "../include/zmq.h" +#include "../include/zmq_utils.h" #include -#include #include - -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); int rc; char buf[32]; const char *ep = "tcp://127.0.0.1:5560"; @@ -54,8 +53,7 @@ int main (void) assert (rc == 0); // Allow unbind to settle - struct timespec t = { 0, 250 * 1000000 }; - nanosleep (&t, NULL); + zmq_sleep(1); // Check that sending would block (there's no outbound connection) rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); @@ -92,7 +90,7 @@ int main (void) assert (rc == 0); // Allow disconnect to settle - nanosleep (&t, NULL); + zmq_sleep(1); // Check that sending would block (there's no inbound connections). rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); diff --git a/tests/test_timeo.cpp b/tests/test_timeo.cpp index 2e81a058..b30b6518 100644 --- a/tests/test_timeo.cpp +++ b/tests/test_timeo.cpp @@ -18,15 +18,15 @@ */ #include "../include/zmq.h" -#include +#include "../include/zmq_utils.h" #include #include -#undef NDEBUG -#include +#include "testutil.hpp" int main (void) { + setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); @@ -46,17 +46,11 @@ int main (void) rc = zmq_setsockopt (frontend, ZMQ_RCVTIMEO, &timeout, sizeof (int)); assert (rc == 0); - struct timeval before, after; - gettimeofday (&before, NULL); + void* stopwatch = zmq_stopwatch_start(); rc = zmq_recv (frontend, buffer, 32, 0); assert (rc == -1); assert (zmq_errno () == EAGAIN); - gettimeofday (&after, NULL); - - long elapsed = (long) - ((after.tv_sec * 1000 + after.tv_usec / 1000) - - (before.tv_sec * 1000 + before.tv_usec / 1000)); - + unsigned int elapsed = zmq_stopwatch_stop(stopwatch) / 1000; assert (elapsed > 200 && elapsed < 300); // Check that normal message flow works as expected diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 0a938078..ecb0b456 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -22,10 +22,16 @@ #include "../include/zmq.h" #include + #undef NDEBUG #include #include +#if defined _WIN32 +#include +#pragma warning(disable:4996) +#endif + // Bounce a message from client to server and back // For REQ/REP or DEALER/DEALER pairs only @@ -191,4 +197,13 @@ void close_zero_linger (void *socket) assert (rc == 0); } +void setup_test_environment() +{ +#if defined _WIN32 + _set_abort_behavior( 0, _WRITE_ABORT_MSG); + _CrtSetReportMode( _CRT_ASSERT, _CRTDBG_MODE_FILE ); + _CrtSetReportFile( _CRT_ASSERT, _CRTDBG_FILE_STDERR ); +#endif +} + #endif