mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-18 11:39:02 +02:00
Merge pull request #3122 from sigiesec/test-migrations
Migrated some more tests to unity and fixed some more code style issues
This commit is contained in:
@@ -470,7 +470,8 @@ tests_test_hwm_LDADD = src/libzmq.la ${UNITY_LIBS}
|
|||||||
tests_test_hwm_CPPFLAGS = ${UNITY_CPPFLAGS}
|
tests_test_hwm_CPPFLAGS = ${UNITY_CPPFLAGS}
|
||||||
|
|
||||||
tests_test_hwm_pubsub_SOURCES = tests/test_hwm_pubsub.cpp
|
tests_test_hwm_pubsub_SOURCES = tests/test_hwm_pubsub.cpp
|
||||||
tests_test_hwm_pubsub_LDADD = src/libzmq.la
|
tests_test_hwm_pubsub_LDADD = src/libzmq.la ${UNITY_LIBS}
|
||||||
|
tests_test_hwm_pubsub_CPPFLAGS = ${UNITY_CPPFLAGS}
|
||||||
|
|
||||||
tests_test_reqrep_device_SOURCES = tests/test_reqrep_device.cpp
|
tests_test_reqrep_device_SOURCES = tests/test_reqrep_device.cpp
|
||||||
tests_test_reqrep_device_LDADD = src/libzmq.la
|
tests_test_reqrep_device_LDADD = src/libzmq.la
|
||||||
@@ -629,7 +630,8 @@ tests_test_capabilities_SOURCES = tests/test_capabilities.cpp
|
|||||||
tests_test_capabilities_LDADD = src/libzmq.la
|
tests_test_capabilities_LDADD = src/libzmq.la
|
||||||
|
|
||||||
tests_test_xpub_nodrop_SOURCES = tests/test_xpub_nodrop.cpp
|
tests_test_xpub_nodrop_SOURCES = tests/test_xpub_nodrop.cpp
|
||||||
tests_test_xpub_nodrop_LDADD = src/libzmq.la
|
tests_test_xpub_nodrop_LDADD = src/libzmq.la ${UNITY_LIBS}
|
||||||
|
tests_test_xpub_nodrop_CPPFLAGS = ${UNITY_CPPFLAGS}
|
||||||
|
|
||||||
tests_test_xpub_manual_SOURCES = tests/test_xpub_manual.cpp
|
tests_test_xpub_manual_SOURCES = tests/test_xpub_manual.cpp
|
||||||
tests_test_xpub_manual_LDADD = src/libzmq.la
|
tests_test_xpub_manual_LDADD = src/libzmq.la
|
||||||
@@ -659,7 +661,8 @@ tests_test_stream_exceeds_buffer_SOURCES = tests/test_stream_exceeds_buffer.cpp
|
|||||||
tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la
|
tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la
|
||||||
|
|
||||||
tests_test_pub_invert_matching_SOURCES = tests/test_pub_invert_matching.cpp
|
tests_test_pub_invert_matching_SOURCES = tests/test_pub_invert_matching.cpp
|
||||||
tests_test_pub_invert_matching_LDADD = src/libzmq.la
|
tests_test_pub_invert_matching_LDADD = src/libzmq.la ${UNITY_LIBS}
|
||||||
|
tests_test_pub_invert_matching_CPPFLAGS = ${UNITY_CPPFLAGS}
|
||||||
|
|
||||||
tests_test_bind_after_connect_tcp_SOURCES = tests/test_bind_after_connect_tcp.cpp
|
tests_test_bind_after_connect_tcp_SOURCES = tests/test_bind_after_connect_tcp.cpp
|
||||||
tests_test_bind_after_connect_tcp_LDADD = src/libzmq.la ${UNITY_LIBS}
|
tests_test_bind_after_connect_tcp_LDADD = src/libzmq.la ${UNITY_LIBS}
|
||||||
|
@@ -125,6 +125,7 @@ struct blob_t
|
|||||||
{
|
{
|
||||||
clear ();
|
clear ();
|
||||||
data_ = static_cast<unsigned char *> (malloc (other.size_));
|
data_ = static_cast<unsigned char *> (malloc (other.size_));
|
||||||
|
alloc_assert (data_);
|
||||||
size_ = other.size_;
|
size_ = other.size_;
|
||||||
owned_ = true;
|
owned_ = true;
|
||||||
memcpy (data_, other.data_, size_);
|
memcpy (data_, other.data_, size_);
|
||||||
@@ -135,6 +136,7 @@ struct blob_t
|
|||||||
{
|
{
|
||||||
clear ();
|
clear ();
|
||||||
data_ = static_cast<unsigned char *> (malloc (size));
|
data_ = static_cast<unsigned char *> (malloc (size));
|
||||||
|
alloc_assert (data_);
|
||||||
size_ = size;
|
size_ = size;
|
||||||
owned_ = true;
|
owned_ = true;
|
||||||
memcpy (data_, data, size_);
|
memcpy (data_, data, size_);
|
||||||
|
@@ -31,7 +31,6 @@
|
|||||||
#define __ZMQ_CLIENT_HPP_INCLUDED__
|
#define __ZMQ_CLIENT_HPP_INCLUDED__
|
||||||
|
|
||||||
#include "socket_base.hpp"
|
#include "socket_base.hpp"
|
||||||
#include "session_base.hpp"
|
|
||||||
#include "fq.hpp"
|
#include "fq.hpp"
|
||||||
#include "lb.hpp"
|
#include "lb.hpp"
|
||||||
|
|
||||||
@@ -41,7 +40,6 @@ class ctx_t;
|
|||||||
class msg_t;
|
class msg_t;
|
||||||
class pipe_t;
|
class pipe_t;
|
||||||
class io_thread_t;
|
class io_thread_t;
|
||||||
class socket_base_t;
|
|
||||||
|
|
||||||
class client_t : public socket_base_t
|
class client_t : public socket_base_t
|
||||||
{
|
{
|
||||||
|
@@ -57,11 +57,6 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_OSX
|
#if defined ZMQ_HAVE_OSX
|
||||||
#include <mach/clock.h>
|
|
||||||
#include <mach/mach.h>
|
|
||||||
#include <time.h>
|
|
||||||
#include <sys/time.h>
|
|
||||||
|
|
||||||
int alt_clock_gettime (int clock_id, timespec *ts)
|
int alt_clock_gettime (int clock_id, timespec *ts)
|
||||||
{
|
{
|
||||||
clock_serv_t cclock;
|
clock_serv_t cclock;
|
||||||
|
@@ -33,17 +33,19 @@
|
|||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_OSX
|
#if defined ZMQ_HAVE_OSX
|
||||||
#include <mach/clock.h>
|
// TODO this is not required in this file, but condition_variable.hpp includes
|
||||||
#include <mach/mach.h>
|
// clock.hpp to get these definitions
|
||||||
#include <time.h>
|
|
||||||
#include <sys/time.h>
|
|
||||||
int alt_clock_gettime (int clock_id, timespec *ts);
|
|
||||||
#ifndef CLOCK_REALTIME
|
#ifndef CLOCK_REALTIME
|
||||||
#define CLOCK_REALTIME 0
|
#define CLOCK_REALTIME 0
|
||||||
#endif
|
#endif
|
||||||
#ifndef HAVE_CLOCK_GETTIME
|
#ifndef HAVE_CLOCK_GETTIME
|
||||||
#define HAVE_CLOCK_GETTIME
|
#define HAVE_CLOCK_GETTIME
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include <mach/clock.h>
|
||||||
|
#include <mach/mach.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <sys/time.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
|
@@ -30,7 +30,6 @@
|
|||||||
#ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
|
#ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
|
||||||
#define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
|
#define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
|
||||||
|
|
||||||
#include "clock.hpp"
|
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "mutex.hpp"
|
#include "mutex.hpp"
|
||||||
|
|
||||||
|
@@ -37,7 +37,6 @@
|
|||||||
#include "decoder_allocators.hpp"
|
#include "decoder_allocators.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "i_decoder.hpp"
|
#include "i_decoder.hpp"
|
||||||
#include "msg.hpp"
|
|
||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
|
@@ -31,13 +31,12 @@
|
|||||||
#define __ZMQ_DISH_HPP_INCLUDED__
|
#define __ZMQ_DISH_HPP_INCLUDED__
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "socket_base.hpp"
|
#include "socket_base.hpp"
|
||||||
#include "session_base.hpp"
|
#include "session_base.hpp"
|
||||||
#include "dist.hpp"
|
#include "dist.hpp"
|
||||||
#include "fq.hpp"
|
#include "fq.hpp"
|
||||||
#include "trie.hpp"
|
#include "msg.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
@@ -33,7 +33,6 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "array.hpp"
|
#include "array.hpp"
|
||||||
#include "pipe.hpp"
|
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
@@ -42,8 +42,8 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "msg.hpp"
|
|
||||||
#include "i_encoder.hpp"
|
#include "i_encoder.hpp"
|
||||||
|
#include "msg.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
@@ -32,11 +32,12 @@
|
|||||||
|
|
||||||
#include "array.hpp"
|
#include "array.hpp"
|
||||||
#include "blob.hpp"
|
#include "blob.hpp"
|
||||||
#include "pipe.hpp"
|
|
||||||
#include "msg.hpp"
|
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
class msg_t;
|
||||||
|
class pipe_t;
|
||||||
|
|
||||||
// Class manages a set of inbound pipes. On receive it performs fair
|
// Class manages a set of inbound pipes. On receive it performs fair
|
||||||
// queueing so that senders gone berserk won't cause denial of
|
// queueing so that senders gone berserk won't cause denial of
|
||||||
// service for decent senders.
|
// service for decent senders.
|
||||||
|
@@ -31,7 +31,6 @@
|
|||||||
#define __ZMQ_GATHER_HPP_INCLUDED__
|
#define __ZMQ_GATHER_HPP_INCLUDED__
|
||||||
|
|
||||||
#include "socket_base.hpp"
|
#include "socket_base.hpp"
|
||||||
#include "session_base.hpp"
|
|
||||||
#include "fq.hpp"
|
#include "fq.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
@@ -39,7 +38,6 @@ namespace zmq
|
|||||||
class ctx_t;
|
class ctx_t;
|
||||||
class pipe_t;
|
class pipe_t;
|
||||||
class msg_t;
|
class msg_t;
|
||||||
class io_thread_t;
|
|
||||||
|
|
||||||
class gather_t : public socket_base_t
|
class gather_t : public socket_base_t
|
||||||
{
|
{
|
||||||
|
@@ -30,8 +30,6 @@
|
|||||||
#ifndef __ZMQ_IO_THREAD_HPP_INCLUDED__
|
#ifndef __ZMQ_IO_THREAD_HPP_INCLUDED__
|
||||||
#define __ZMQ_IO_THREAD_HPP_INCLUDED__
|
#define __ZMQ_IO_THREAD_HPP_INCLUDED__
|
||||||
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
#include "object.hpp"
|
#include "object.hpp"
|
||||||
#include "poller.hpp"
|
#include "poller.hpp"
|
||||||
|
@@ -31,10 +31,12 @@
|
|||||||
#define __ZMQ_LB_HPP_INCLUDED__
|
#define __ZMQ_LB_HPP_INCLUDED__
|
||||||
|
|
||||||
#include "array.hpp"
|
#include "array.hpp"
|
||||||
#include "pipe.hpp"
|
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
class msg_t;
|
||||||
|
class pipe_t;
|
||||||
|
|
||||||
// This class manages a set of outbound pipes. On send it load balances
|
// This class manages a set of outbound pipes. On send it load balances
|
||||||
// messages fairly among the pipes.
|
// messages fairly among the pipes.
|
||||||
|
|
||||||
|
@@ -34,6 +34,8 @@
|
|||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
class msg_t;
|
||||||
|
|
||||||
class mechanism_base_t : public mechanism_t
|
class mechanism_base_t : public mechanism_t
|
||||||
{
|
{
|
||||||
protected:
|
protected:
|
||||||
|
@@ -15,6 +15,7 @@
|
|||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
class io_thread_t;
|
class io_thread_t;
|
||||||
|
class msg_t;
|
||||||
class session_base_t;
|
class session_base_t;
|
||||||
|
|
||||||
class norm_engine_t : public io_object_t, public i_engine
|
class norm_engine_t : public io_object_t, public i_engine
|
||||||
|
@@ -36,7 +36,6 @@
|
|||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "msg.hpp"
|
#include "msg.hpp"
|
||||||
#include "session_base.hpp"
|
#include "session_base.hpp"
|
||||||
#include "wire.hpp"
|
|
||||||
#include "null_mechanism.hpp"
|
#include "null_mechanism.hpp"
|
||||||
|
|
||||||
zmq::null_mechanism_t::null_mechanism_t (session_base_t *session_,
|
zmq::null_mechanism_t::null_mechanism_t (session_base_t *session_,
|
||||||
|
@@ -29,6 +29,7 @@
|
|||||||
|
|
||||||
#include "precompiled.hpp"
|
#include "precompiled.hpp"
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <set>
|
||||||
|
|
||||||
#include "options.hpp"
|
#include "options.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
|
@@ -32,7 +32,6 @@
|
|||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <set>
|
|
||||||
#include <map>
|
#include <map>
|
||||||
|
|
||||||
#include "atomic_ptr.hpp"
|
#include "atomic_ptr.hpp"
|
||||||
@@ -41,6 +40,7 @@
|
|||||||
#include "tcp_address.hpp"
|
#include "tcp_address.hpp"
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
|
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
|
||||||
|
#include <set>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#endif
|
#endif
|
||||||
#ifdef ZMQ_HAVE_LOCAL_PEERCRED
|
#ifdef ZMQ_HAVE_LOCAL_PEERCRED
|
||||||
|
@@ -31,7 +31,6 @@
|
|||||||
#define __ZMQ_OWN_HPP_INCLUDED__
|
#define __ZMQ_OWN_HPP_INCLUDED__
|
||||||
|
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <algorithm>
|
|
||||||
|
|
||||||
#include "object.hpp"
|
#include "object.hpp"
|
||||||
#include "options.hpp"
|
#include "options.hpp"
|
||||||
|
@@ -30,7 +30,6 @@
|
|||||||
#ifndef __ZMQ_PIPE_HPP_INCLUDED__
|
#ifndef __ZMQ_PIPE_HPP_INCLUDED__
|
||||||
#define __ZMQ_PIPE_HPP_INCLUDED__
|
#define __ZMQ_PIPE_HPP_INCLUDED__
|
||||||
|
|
||||||
#include "msg.hpp"
|
|
||||||
#include "ypipe_base.hpp"
|
#include "ypipe_base.hpp"
|
||||||
#include "config.hpp"
|
#include "config.hpp"
|
||||||
#include "object.hpp"
|
#include "object.hpp"
|
||||||
@@ -40,7 +39,7 @@
|
|||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
class object_t;
|
class msg_t;
|
||||||
class pipe_t;
|
class pipe_t;
|
||||||
|
|
||||||
// Create a pipepair for bi-directional transfer of messages.
|
// Create a pipepair for bi-directional transfer of messages.
|
||||||
|
@@ -30,7 +30,6 @@
|
|||||||
#ifndef __ZMQ_PLAIN_SERVER_HPP_INCLUDED__
|
#ifndef __ZMQ_PLAIN_SERVER_HPP_INCLUDED__
|
||||||
#define __ZMQ_PLAIN_SERVER_HPP_INCLUDED__
|
#define __ZMQ_PLAIN_SERVER_HPP_INCLUDED__
|
||||||
|
|
||||||
#include "mechanism.hpp"
|
|
||||||
#include "options.hpp"
|
#include "options.hpp"
|
||||||
#include "zap_client.hpp"
|
#include "zap_client.hpp"
|
||||||
|
|
||||||
|
@@ -33,6 +33,7 @@
|
|||||||
#include "poller.hpp"
|
#include "poller.hpp"
|
||||||
#include "proxy.hpp"
|
#include "proxy.hpp"
|
||||||
#include "likely.hpp"
|
#include "likely.hpp"
|
||||||
|
#include "msg.hpp"
|
||||||
|
|
||||||
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS \
|
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS \
|
||||||
&& !defined ZMQ_HAVE_AIX
|
&& !defined ZMQ_HAVE_AIX
|
||||||
|
@@ -36,14 +36,12 @@
|
|||||||
|
|
||||||
#include "socket_base.hpp"
|
#include "socket_base.hpp"
|
||||||
#include "session_base.hpp"
|
#include "session_base.hpp"
|
||||||
#include "mtrie.hpp"
|
|
||||||
#include "array.hpp"
|
|
||||||
#include "dist.hpp"
|
#include "dist.hpp"
|
||||||
|
#include "msg.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
class ctx_t;
|
class ctx_t;
|
||||||
class msg_t;
|
|
||||||
class pipe_t;
|
class pipe_t;
|
||||||
class io_thread_t;
|
class io_thread_t;
|
||||||
|
|
||||||
|
@@ -30,7 +30,6 @@
|
|||||||
#ifndef __ZMQ_RAW_DECODER_HPP_INCLUDED__
|
#ifndef __ZMQ_RAW_DECODER_HPP_INCLUDED__
|
||||||
#define __ZMQ_RAW_DECODER_HPP_INCLUDED__
|
#define __ZMQ_RAW_DECODER_HPP_INCLUDED__
|
||||||
|
|
||||||
#include "err.hpp"
|
|
||||||
#include "msg.hpp"
|
#include "msg.hpp"
|
||||||
#include "i_decoder.hpp"
|
#include "i_decoder.hpp"
|
||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
|
@@ -30,8 +30,7 @@
|
|||||||
#include "precompiled.hpp"
|
#include "precompiled.hpp"
|
||||||
#include "encoder.hpp"
|
#include "encoder.hpp"
|
||||||
#include "raw_encoder.hpp"
|
#include "raw_encoder.hpp"
|
||||||
#include "likely.hpp"
|
#include "msg.hpp"
|
||||||
#include "wire.hpp"
|
|
||||||
|
|
||||||
zmq::raw_encoder_t::raw_encoder_t (size_t bufsize_) :
|
zmq::raw_encoder_t::raw_encoder_t (size_t bufsize_) :
|
||||||
encoder_base_t<raw_encoder_t> (bufsize_)
|
encoder_base_t<raw_encoder_t> (bufsize_)
|
||||||
|
@@ -33,11 +33,8 @@
|
|||||||
#include <stddef.h>
|
#include <stddef.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <algorithm>
|
|
||||||
|
|
||||||
#include "err.hpp"
|
#include "encoder.hpp"
|
||||||
#include "msg.hpp"
|
|
||||||
#include "i_encoder.hpp"
|
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
@@ -48,7 +48,6 @@
|
|||||||
|
|
||||||
#include "ctx.hpp"
|
#include "ctx.hpp"
|
||||||
#include "fd.hpp"
|
#include "fd.hpp"
|
||||||
#include "thread.hpp"
|
|
||||||
#include "poller_base.hpp"
|
#include "poller_base.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
|
@@ -36,12 +36,12 @@
|
|||||||
#include "session_base.hpp"
|
#include "session_base.hpp"
|
||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
#include "blob.hpp"
|
#include "blob.hpp"
|
||||||
#include "msg.hpp"
|
|
||||||
#include "fq.hpp"
|
#include "fq.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
class ctx_t;
|
class ctx_t;
|
||||||
|
class msg_t;
|
||||||
class pipe_t;
|
class pipe_t;
|
||||||
|
|
||||||
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
|
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
|
||||||
|
@@ -30,7 +30,6 @@
|
|||||||
#ifndef __ZMQ_SESSION_BASE_HPP_INCLUDED__
|
#ifndef __ZMQ_SESSION_BASE_HPP_INCLUDED__
|
||||||
#define __ZMQ_SESSION_BASE_HPP_INCLUDED__
|
#define __ZMQ_SESSION_BASE_HPP_INCLUDED__
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <stdarg.h>
|
#include <stdarg.h>
|
||||||
|
|
||||||
#include "own.hpp"
|
#include "own.hpp"
|
||||||
@@ -41,9 +40,7 @@
|
|||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
class pipe_t;
|
|
||||||
class io_thread_t;
|
class io_thread_t;
|
||||||
class socket_base_t;
|
|
||||||
struct i_engine;
|
struct i_engine;
|
||||||
struct address_t;
|
struct address_t;
|
||||||
|
|
||||||
|
@@ -39,10 +39,8 @@
|
|||||||
#include "blob.hpp"
|
#include "blob.hpp"
|
||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
#include "poller.hpp"
|
#include "poller.hpp"
|
||||||
#include "atomic_counter.hpp"
|
|
||||||
#include "i_poll_events.hpp"
|
#include "i_poll_events.hpp"
|
||||||
#include "i_mailbox.hpp"
|
#include "i_mailbox.hpp"
|
||||||
#include "stdint.hpp"
|
|
||||||
#include "clock.hpp"
|
#include "clock.hpp"
|
||||||
#include "pipe.hpp"
|
#include "pipe.hpp"
|
||||||
|
|
||||||
|
@@ -47,7 +47,6 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <algorithm>
|
|
||||||
|
|
||||||
#include "socket_base.hpp"
|
#include "socket_base.hpp"
|
||||||
#include "signaler.hpp"
|
#include "signaler.hpp"
|
||||||
|
@@ -166,14 +166,14 @@ void zmq::socks_request_encoder_t::encode (const socks_request_t &req)
|
|||||||
|
|
||||||
const int rc = getaddrinfo (req.hostname.c_str (), NULL, &hints, &res);
|
const int rc = getaddrinfo (req.hostname.c_str (), NULL, &hints, &res);
|
||||||
if (rc == 0 && res->ai_family == AF_INET) {
|
if (rc == 0 && res->ai_family == AF_INET) {
|
||||||
struct sockaddr_in *sockaddr_in =
|
const struct sockaddr_in *sockaddr_in =
|
||||||
reinterpret_cast<struct sockaddr_in *> (res->ai_addr);
|
reinterpret_cast<const struct sockaddr_in *> (res->ai_addr);
|
||||||
*ptr++ = 0x01;
|
*ptr++ = 0x01;
|
||||||
memcpy (ptr, &sockaddr_in->sin_addr, 4);
|
memcpy (ptr, &sockaddr_in->sin_addr, 4);
|
||||||
ptr += 4;
|
ptr += 4;
|
||||||
} else if (rc == 0 && res->ai_family == AF_INET6) {
|
} else if (rc == 0 && res->ai_family == AF_INET6) {
|
||||||
struct sockaddr_in6 *sockaddr_in6 =
|
const struct sockaddr_in6 *sockaddr_in6 =
|
||||||
reinterpret_cast<struct sockaddr_in6 *> (res->ai_addr);
|
reinterpret_cast<const struct sockaddr_in6 *> (res->ai_addr);
|
||||||
*ptr++ = 0x04;
|
*ptr++ = 0x04;
|
||||||
memcpy (ptr, &sockaddr_in6->sin6_addr, 16);
|
memcpy (ptr, &sockaddr_in6->sin6_addr, 16);
|
||||||
ptr += 16;
|
ptr += 16;
|
||||||
|
@@ -1110,12 +1110,14 @@ int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_)
|
|||||||
|
|
||||||
int zmq::stream_engine_t::process_command_message (msg_t *msg_)
|
int zmq::stream_engine_t::process_command_message (msg_t *msg_)
|
||||||
{
|
{
|
||||||
uint8_t cmd_name_size = *(static_cast<uint8_t *> (msg_->data ()));
|
const uint8_t cmd_name_size =
|
||||||
|
*(static_cast<const uint8_t *> (msg_->data ()));
|
||||||
// Malformed command
|
// Malformed command
|
||||||
if (msg_->size () < cmd_name_size + sizeof (cmd_name_size))
|
if (msg_->size () < cmd_name_size + sizeof (cmd_name_size))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
uint8_t *cmd_name = (static_cast<uint8_t *> (msg_->data ())) + 1;
|
const uint8_t *cmd_name =
|
||||||
|
(static_cast<const uint8_t *> (msg_->data ())) + 1;
|
||||||
if (cmd_name_size == 4
|
if (cmd_name_size == 4
|
||||||
&& (memcmp (cmd_name, "PING", cmd_name_size) == 0
|
&& (memcmp (cmd_name, "PING", cmd_name_size) == 0
|
||||||
|| memcmp (cmd_name, "PONG", cmd_name_size) == 0))
|
|| memcmp (cmd_name, "PONG", cmd_name_size) == 0))
|
||||||
|
@@ -40,6 +40,7 @@
|
|||||||
#include "options.hpp"
|
#include "options.hpp"
|
||||||
#include "socket_base.hpp"
|
#include "socket_base.hpp"
|
||||||
#include "metadata.hpp"
|
#include "metadata.hpp"
|
||||||
|
#include "msg.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
@@ -51,7 +52,6 @@ enum
|
|||||||
};
|
};
|
||||||
|
|
||||||
class io_thread_t;
|
class io_thread_t;
|
||||||
class msg_t;
|
|
||||||
class session_base_t;
|
class session_base_t;
|
||||||
class mechanism_t;
|
class mechanism_t;
|
||||||
|
|
||||||
|
@@ -262,7 +262,7 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
zmq_assert (addr->resolved.tcp_addr != NULL);
|
zmq_assert (addr->resolved.tcp_addr != NULL);
|
||||||
tcp_address_t *const tcp_addr = addr->resolved.tcp_addr;
|
const tcp_address_t *const tcp_addr = addr->resolved.tcp_addr;
|
||||||
|
|
||||||
// Create the socket.
|
// Create the socket.
|
||||||
s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
|
s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
|
||||||
|
@@ -35,6 +35,8 @@
|
|||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
#include "ip_resolver.hpp"
|
#include "ip_resolver.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
|
@@ -40,9 +40,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|||||||
#endif
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "udp_address.hpp"
|
||||||
#include "udp_engine.hpp"
|
#include "udp_engine.hpp"
|
||||||
#include "session_base.hpp"
|
#include "session_base.hpp"
|
||||||
#include "v2_protocol.hpp"
|
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "ip.hpp"
|
#include "ip.hpp"
|
||||||
|
|
||||||
@@ -285,14 +285,15 @@ void zmq::udp_engine_t::terminate ()
|
|||||||
|
|
||||||
void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg, sockaddr_in *addr)
|
void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg, sockaddr_in *addr)
|
||||||
{
|
{
|
||||||
char *name = inet_ntoa (addr->sin_addr);
|
const char *const name = inet_ntoa (addr->sin_addr);
|
||||||
|
|
||||||
char port[6];
|
char port[6];
|
||||||
sprintf (port, "%d", static_cast<int> (ntohs (addr->sin_port)));
|
sprintf (port, "%d", static_cast<int> (ntohs (addr->sin_port)));
|
||||||
|
|
||||||
int size = static_cast<int> (strlen (name))
|
const int size = static_cast<int> (strlen (name))
|
||||||
+ static_cast<int> (strlen (port)) + 1 + 1; // Colon + NULL
|
+ static_cast<int> (strlen (port)) + 1
|
||||||
int rc = msg->init_size (size);
|
+ 1; // Colon + NULL
|
||||||
|
const int rc = msg->init_size (size);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
msg->set_flags (msg_t::more);
|
msg->set_flags (msg_t::more);
|
||||||
char *address = static_cast<char *> (msg->data ());
|
char *address = static_cast<char *> (msg->data ());
|
||||||
@@ -472,7 +473,8 @@ void zmq::udp_engine_t::in_event ()
|
|||||||
body_size = nbytes;
|
body_size = nbytes;
|
||||||
body_offset = 0;
|
body_offset = 0;
|
||||||
} else {
|
} else {
|
||||||
char *group_buffer = reinterpret_cast<char *> (in_buffer) + 1;
|
const char *group_buffer =
|
||||||
|
reinterpret_cast<const char *> (in_buffer) + 1;
|
||||||
int group_size = in_buffer[0];
|
int group_size = in_buffer[0];
|
||||||
|
|
||||||
rc = msg.init_size (group_size);
|
rc = msg.init_size (group_size);
|
||||||
|
@@ -5,7 +5,6 @@
|
|||||||
#include "io_object.hpp"
|
#include "io_object.hpp"
|
||||||
#include "i_engine.hpp"
|
#include "i_engine.hpp"
|
||||||
#include "address.hpp"
|
#include "address.hpp"
|
||||||
#include "udp_address.hpp"
|
|
||||||
#include "msg.hpp"
|
#include "msg.hpp"
|
||||||
|
|
||||||
#define MAX_UDP_MSG 8192
|
#define MAX_UDP_MSG 8192
|
||||||
|
@@ -30,7 +30,7 @@
|
|||||||
#include "precompiled.hpp"
|
#include "precompiled.hpp"
|
||||||
#include "encoder.hpp"
|
#include "encoder.hpp"
|
||||||
#include "v1_encoder.hpp"
|
#include "v1_encoder.hpp"
|
||||||
#include "likely.hpp"
|
#include "msg.hpp"
|
||||||
#include "wire.hpp"
|
#include "wire.hpp"
|
||||||
|
|
||||||
zmq::v1_encoder_t::v1_encoder_t (size_t bufsize_) :
|
zmq::v1_encoder_t::v1_encoder_t (size_t bufsize_) :
|
||||||
|
@@ -30,6 +30,7 @@
|
|||||||
#include "precompiled.hpp"
|
#include "precompiled.hpp"
|
||||||
#include "v2_protocol.hpp"
|
#include "v2_protocol.hpp"
|
||||||
#include "v2_encoder.hpp"
|
#include "v2_encoder.hpp"
|
||||||
|
#include "msg.hpp"
|
||||||
#include "likely.hpp"
|
#include "likely.hpp"
|
||||||
#include "wire.hpp"
|
#include "wire.hpp"
|
||||||
|
|
||||||
|
@@ -89,7 +89,8 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
|
|||||||
msg_t sub;
|
msg_t sub;
|
||||||
while (pipe_->read (&sub)) {
|
while (pipe_->read (&sub)) {
|
||||||
// Apply the subscription to the trie
|
// Apply the subscription to the trie
|
||||||
unsigned char *const data = static_cast<unsigned char *> (sub.data ());
|
const unsigned char *const data =
|
||||||
|
static_cast<const unsigned char *> (sub.data ());
|
||||||
const size_t size = sub.size ();
|
const size_t size = sub.size ();
|
||||||
metadata_t *metadata = sub.metadata ();
|
metadata_t *metadata = sub.metadata ();
|
||||||
if (size > 0 && (*data == 0 || *data == 1)) {
|
if (size > 0 && (*data == 0 || *data == 1)) {
|
||||||
|
@@ -31,12 +31,10 @@
|
|||||||
#define __ZMQ_XPUB_HPP_INCLUDED__
|
#define __ZMQ_XPUB_HPP_INCLUDED__
|
||||||
|
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <string>
|
|
||||||
|
|
||||||
#include "socket_base.hpp"
|
#include "socket_base.hpp"
|
||||||
#include "session_base.hpp"
|
#include "session_base.hpp"
|
||||||
#include "mtrie.hpp"
|
#include "mtrie.hpp"
|
||||||
#include "array.hpp"
|
|
||||||
#include "dist.hpp"
|
#include "dist.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
|
@@ -186,7 +186,7 @@ int zap_client_t::receive_and_process_zap_reply ()
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Status code frame, only 200, 300, 400 and 500 are valid status codes
|
// Status code frame, only 200, 300, 400 and 500 are valid status codes
|
||||||
char *status_code_data = static_cast<char *> (msg[3].data ());
|
const char *status_code_data = static_cast<const char *> (msg[3].data ());
|
||||||
if (msg[3].size () != 3 || status_code_data[0] < '2'
|
if (msg[3].size () != 3 || status_code_data[0] < '2'
|
||||||
|| status_code_data[0] > '5' || status_code_data[1] != '0'
|
|| status_code_data[0] > '5' || status_code_data[1] != '0'
|
||||||
|| status_code_data[2] != '0') {
|
|| status_code_data[2] != '0') {
|
||||||
|
@@ -545,7 +545,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
|
|||||||
memcpy (a_[i].iov_base, static_cast<char *> (zmq_msg_data (&msg)),
|
memcpy (a_[i].iov_base, static_cast<char *> (zmq_msg_data (&msg)),
|
||||||
a_[i].iov_len);
|
a_[i].iov_len);
|
||||||
// Assume zmq_socket ZMQ_RVCMORE is properly set.
|
// Assume zmq_socket ZMQ_RVCMORE is properly set.
|
||||||
zmq::msg_t *p_msg = reinterpret_cast<zmq::msg_t *> (&msg);
|
const zmq::msg_t *p_msg = reinterpret_cast<const zmq::msg_t *> (&msg);
|
||||||
recvmore = p_msg->flags () & zmq::msg_t::more;
|
recvmore = p_msg->flags () & zmq::msg_t::more;
|
||||||
rc = zmq_msg_close (&msg);
|
rc = zmq_msg_close (&msg);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
@@ -678,7 +678,8 @@ const char *zmq_msg_group (zmq_msg_t *msg_)
|
|||||||
|
|
||||||
const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
|
const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
|
||||||
{
|
{
|
||||||
zmq::metadata_t *metadata = ((zmq::msg_t *) msg_)->metadata ();
|
const zmq::metadata_t *metadata =
|
||||||
|
reinterpret_cast<const zmq::msg_t *> (msg_)->metadata ();
|
||||||
const char *value = NULL;
|
const char *value = NULL;
|
||||||
if (metadata)
|
if (metadata)
|
||||||
value = metadata->get (std::string (property_));
|
value = metadata->get (std::string (property_));
|
||||||
@@ -1355,7 +1356,7 @@ int zmq_socket_get_peer_state (void *s_,
|
|||||||
const void *routing_id_,
|
const void *routing_id_,
|
||||||
size_t routing_id_size_)
|
size_t routing_id_size_)
|
||||||
{
|
{
|
||||||
zmq::socket_base_t *s = as_socket_base_t (s_);
|
const zmq::socket_base_t *const s = as_socket_base_t (s_);
|
||||||
if (!s)
|
if (!s)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
|
@@ -28,30 +28,35 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "testutil.hpp"
|
#include "testutil.hpp"
|
||||||
|
#include "testutil_unity.hpp"
|
||||||
|
|
||||||
|
void setUp ()
|
||||||
|
{
|
||||||
|
setup_test_context ();
|
||||||
|
}
|
||||||
|
|
||||||
|
void tearDown ()
|
||||||
|
{
|
||||||
|
teardown_test_context ();
|
||||||
|
}
|
||||||
|
|
||||||
// const int MAX_SENDS = 10000;
|
// const int MAX_SENDS = 10000;
|
||||||
|
|
||||||
int test_defaults (int send_hwm, int msgCnt)
|
int test_defaults (int send_hwm, int msgCnt)
|
||||||
{
|
{
|
||||||
void *ctx = zmq_ctx_new ();
|
|
||||||
assert (ctx);
|
|
||||||
int rc;
|
|
||||||
|
|
||||||
// Set up bind socket
|
// Set up bind socket
|
||||||
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
|
void *pub_socket = test_context_socket (ZMQ_PUB);
|
||||||
assert (pub_socket);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, "inproc://a"));
|
||||||
rc = zmq_bind (pub_socket, "inproc://a");
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
// Set up connect socket
|
// Set up connect socket
|
||||||
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
|
void *sub_socket = test_context_socket (ZMQ_SUB);
|
||||||
assert (sub_socket);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, "inproc://a"));
|
||||||
rc = zmq_connect (sub_socket, "inproc://a");
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
//set a hwm on publisher
|
//set a hwm on publisher
|
||||||
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
rc = zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0);
|
zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
|
||||||
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
|
zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
|
||||||
|
|
||||||
// Send until we block
|
// Send until we block
|
||||||
int send_count = 0;
|
int send_count = 0;
|
||||||
@@ -67,17 +72,11 @@ int test_defaults (int send_hwm, int msgCnt)
|
|||||||
++recv_count;
|
++recv_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert (send_hwm == recv_count);
|
TEST_ASSERT_EQUAL_INT (send_hwm, recv_count);
|
||||||
|
|
||||||
// Clean up
|
// Clean up
|
||||||
rc = zmq_close (sub_socket);
|
test_context_socket_close (sub_socket);
|
||||||
assert (rc == 0);
|
test_context_socket_close (pub_socket);
|
||||||
|
|
||||||
rc = zmq_close (pub_socket);
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
rc = zmq_ctx_term (ctx);
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
return recv_count;
|
return recv_count;
|
||||||
}
|
}
|
||||||
@@ -96,89 +95,68 @@ int receive (void *socket)
|
|||||||
|
|
||||||
int test_blocking (int send_hwm, int msgCnt)
|
int test_blocking (int send_hwm, int msgCnt)
|
||||||
{
|
{
|
||||||
void *ctx = zmq_ctx_new ();
|
|
||||||
assert (ctx);
|
|
||||||
int rc;
|
|
||||||
|
|
||||||
// Set up bind socket
|
// Set up bind socket
|
||||||
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
|
void *pub_socket = test_context_socket (ZMQ_PUB);
|
||||||
assert (pub_socket);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, "inproc://a"));
|
||||||
rc = zmq_bind (pub_socket, "inproc://a");
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
// Set up connect socket
|
// Set up connect socket
|
||||||
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
|
void *sub_socket = test_context_socket (ZMQ_SUB);
|
||||||
assert (sub_socket);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, "inproc://a"));
|
||||||
rc = zmq_connect (sub_socket, "inproc://a");
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
//set a hwm on publisher
|
//set a hwm on publisher
|
||||||
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
|
zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
|
||||||
int wait = 1;
|
int wait = 1;
|
||||||
rc = zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait));
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
rc = zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0);
|
zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait)));
|
||||||
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
|
zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
|
||||||
|
|
||||||
// Send until we block
|
// Send until we block
|
||||||
int send_count = 0;
|
int send_count = 0;
|
||||||
int recv_count = 0;
|
int recv_count = 0;
|
||||||
while (send_count < msgCnt) {
|
while (send_count < msgCnt) {
|
||||||
rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
|
const int rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
++send_count;
|
++send_count;
|
||||||
} else if (-1 == rc) {
|
} else if (-1 == rc) {
|
||||||
assert (EAGAIN == errno);
|
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
|
||||||
recv_count += receive (sub_socket);
|
recv_count += receive (sub_socket);
|
||||||
assert (recv_count == send_count);
|
TEST_ASSERT_EQUAL_INT (send_count, recv_count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
recv_count += receive (sub_socket);
|
recv_count += receive (sub_socket);
|
||||||
|
|
||||||
// Clean up
|
// Clean up
|
||||||
rc = zmq_close (sub_socket);
|
test_context_socket_close (sub_socket);
|
||||||
assert (rc == 0);
|
test_context_socket_close (pub_socket);
|
||||||
|
|
||||||
rc = zmq_close (pub_socket);
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
rc = zmq_ctx_term (ctx);
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
return recv_count;
|
return recv_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// hwm should apply to the messages that have already been received
|
||||||
// with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100
|
// with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100
|
||||||
void test_reset_hwm ()
|
void test_reset_hwm ()
|
||||||
{
|
{
|
||||||
int first_count = 9999;
|
const int first_count = 9999;
|
||||||
int second_count = 1100;
|
const int second_count = 1100;
|
||||||
int hwm = 11024;
|
int hwm = 11024;
|
||||||
size_t len = MAX_SOCKET_STRING;
|
|
||||||
char my_endpoint[MAX_SOCKET_STRING];
|
char my_endpoint[MAX_SOCKET_STRING];
|
||||||
|
|
||||||
void *ctx = zmq_ctx_new ();
|
|
||||||
assert (ctx);
|
|
||||||
int rc;
|
|
||||||
|
|
||||||
// Set up bind socket
|
// Set up bind socket
|
||||||
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
|
void *pub_socket = test_context_socket (ZMQ_PUB);
|
||||||
assert (pub_socket);
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm));
|
zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
|
||||||
assert (rc == 0);
|
bind_loopback_ipv4 (pub_socket, my_endpoint, MAX_SOCKET_STRING);
|
||||||
rc = zmq_bind (pub_socket, "tcp://127.0.0.1:*");
|
|
||||||
assert (rc == 0);
|
|
||||||
rc = zmq_getsockopt (pub_socket, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
// Set up connect socket
|
// Set up connect socket
|
||||||
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
|
void *sub_socket = test_context_socket (ZMQ_SUB);
|
||||||
assert (sub_socket);
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
rc = zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm));
|
zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
|
||||||
assert (rc == 0);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, my_endpoint));
|
||||||
rc = zmq_connect (sub_socket, my_endpoint);
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
assert (rc == 0);
|
zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
|
||||||
rc = zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0);
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
msleep (SETTLE_TIME);
|
msleep (SETTLE_TIME);
|
||||||
|
|
||||||
@@ -187,7 +165,7 @@ void test_reset_hwm ()
|
|||||||
while (send_count < first_count
|
while (send_count < first_count
|
||||||
&& zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
&& zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||||
++send_count;
|
++send_count;
|
||||||
assert (first_count == send_count);
|
TEST_ASSERT_EQUAL_INT (first_count, send_count);
|
||||||
|
|
||||||
msleep (SETTLE_TIME);
|
msleep (SETTLE_TIME);
|
||||||
|
|
||||||
@@ -196,7 +174,7 @@ void test_reset_hwm ()
|
|||||||
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
|
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
|
||||||
++recv_count;
|
++recv_count;
|
||||||
}
|
}
|
||||||
assert (first_count == recv_count);
|
TEST_ASSERT_EQUAL_INT (first_count, recv_count);
|
||||||
|
|
||||||
msleep (SETTLE_TIME);
|
msleep (SETTLE_TIME);
|
||||||
|
|
||||||
@@ -205,7 +183,7 @@ void test_reset_hwm ()
|
|||||||
while (send_count < second_count
|
while (send_count < second_count
|
||||||
&& zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
&& zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||||
++send_count;
|
++send_count;
|
||||||
assert (second_count == send_count);
|
TEST_ASSERT_EQUAL_INT (second_count, send_count);
|
||||||
|
|
||||||
msleep (SETTLE_TIME);
|
msleep (SETTLE_TIME);
|
||||||
|
|
||||||
@@ -214,35 +192,32 @@ void test_reset_hwm ()
|
|||||||
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
|
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
|
||||||
++recv_count;
|
++recv_count;
|
||||||
}
|
}
|
||||||
assert (second_count == recv_count);
|
TEST_ASSERT_EQUAL_INT (second_count, recv_count);
|
||||||
|
|
||||||
// Clean up
|
// Clean up
|
||||||
rc = zmq_close (sub_socket);
|
test_context_socket_close (sub_socket);
|
||||||
assert (rc == 0);
|
test_context_socket_close (pub_socket);
|
||||||
|
|
||||||
rc = zmq_close (pub_socket);
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
rc = zmq_ctx_term (ctx);
|
|
||||||
assert (rc == 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int main (void)
|
void test_defaults_1000 ()
|
||||||
|
{
|
||||||
|
// send 1000 msg on hwm 1000, receive 1000
|
||||||
|
TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000));
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_blocking_2000 ()
|
||||||
|
{
|
||||||
|
// send 6000 msg on hwm 2000, drops above hwm, only receive hwm
|
||||||
|
TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000));
|
||||||
|
}
|
||||||
|
|
||||||
|
int main ()
|
||||||
{
|
{
|
||||||
setup_test_environment ();
|
setup_test_environment ();
|
||||||
|
|
||||||
int count;
|
UNITY_BEGIN ();
|
||||||
|
RUN_TEST (test_defaults_1000);
|
||||||
// send 1000 msg on hwm 1000, receive 1000
|
RUN_TEST (test_blocking_2000);
|
||||||
count = test_defaults (1000, 1000);
|
RUN_TEST (test_reset_hwm);
|
||||||
assert (count == 1000);
|
return UNITY_END ();
|
||||||
|
|
||||||
// send 6000 msg on hwm 2000, drops above hwm, only receive hwm
|
|
||||||
count = test_blocking (2000, 6000);
|
|
||||||
assert (count == 6000);
|
|
||||||
|
|
||||||
// hwm should apply to the messages that have already been received
|
|
||||||
test_reset_hwm ();
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
@@ -28,109 +28,99 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "testutil.hpp"
|
#include "testutil.hpp"
|
||||||
|
#include "testutil_unity.hpp"
|
||||||
|
|
||||||
int main (void)
|
void setUp ()
|
||||||
{
|
{
|
||||||
setup_test_environment ();
|
setup_test_context ();
|
||||||
void *ctx = zmq_ctx_new ();
|
}
|
||||||
assert (ctx);
|
|
||||||
|
|
||||||
|
void tearDown ()
|
||||||
|
{
|
||||||
|
teardown_test_context ();
|
||||||
|
}
|
||||||
|
|
||||||
|
void test ()
|
||||||
|
{
|
||||||
// Create a publisher
|
// Create a publisher
|
||||||
void *pub = zmq_socket (ctx, ZMQ_PUB);
|
void *pub = test_context_socket (ZMQ_PUB);
|
||||||
assert (pub);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
|
||||||
int rc = zmq_bind (pub, "inproc://soname");
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
// Create two subscribers
|
// Create two subscribers
|
||||||
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
|
void *sub1 = test_context_socket (ZMQ_SUB);
|
||||||
assert (sub1);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, "inproc://soname"));
|
||||||
rc = zmq_connect (sub1, "inproc://soname");
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
void *sub2 = zmq_socket (ctx, ZMQ_SUB);
|
void *sub2 = test_context_socket (ZMQ_SUB);
|
||||||
assert (sub2);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, "inproc://soname"));
|
||||||
rc = zmq_connect (sub2, "inproc://soname");
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
// Subscribe pub1 to one prefix
|
// Subscribe pub1 to one prefix
|
||||||
// and pub2 to another prefix.
|
// and pub2 to another prefix.
|
||||||
const char PREFIX1[] = "prefix1";
|
const char PREFIX1[] = "prefix1";
|
||||||
const char PREFIX2[] = "p2";
|
const char PREFIX2[] = "p2";
|
||||||
|
|
||||||
rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, PREFIX1, sizeof (PREFIX1));
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
assert (rc == 0);
|
zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, PREFIX1, strlen (PREFIX1)));
|
||||||
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
rc = zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, PREFIX2, sizeof (PREFIX2));
|
zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, PREFIX2, strlen (PREFIX2)));
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
// Send a message with the first prefix
|
// Send a message with the first prefix
|
||||||
rc = zmq_send_const (pub, PREFIX1, sizeof (PREFIX1), 0);
|
send_string_expect_success (pub, PREFIX1, 0);
|
||||||
assert (rc == sizeof (PREFIX1));
|
msleep (SETTLE_TIME);
|
||||||
|
|
||||||
// sub1 should receive it, but not sub2
|
// sub1 should receive it, but not sub2
|
||||||
rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT);
|
recv_string_expect_success (sub1, PREFIX1, ZMQ_DONTWAIT);
|
||||||
assert (rc == sizeof (PREFIX1));
|
|
||||||
|
|
||||||
rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT);
|
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT));
|
||||||
assert (rc == -1);
|
|
||||||
assert (errno == EAGAIN);
|
|
||||||
|
|
||||||
// Send a message with the second prefix
|
// Send a message with the second prefix
|
||||||
rc = zmq_send_const (pub, PREFIX2, sizeof (PREFIX2), 0);
|
send_string_expect_success (pub, PREFIX2, 0);
|
||||||
assert (rc == sizeof (PREFIX2));
|
msleep (SETTLE_TIME);
|
||||||
|
|
||||||
// sub2 should receive it, but not sub1
|
// sub2 should receive it, but not sub1
|
||||||
rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT);
|
recv_string_expect_success (sub2, PREFIX2, ZMQ_DONTWAIT);
|
||||||
assert (rc == sizeof (PREFIX2));
|
|
||||||
|
|
||||||
rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT);
|
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT));
|
||||||
assert (rc == -1);
|
|
||||||
assert (errno == EAGAIN);
|
|
||||||
|
|
||||||
// Now invert the matching
|
// Now invert the matching
|
||||||
int invert = 1;
|
int invert = 1;
|
||||||
rc = zmq_setsockopt (pub, ZMQ_INVERT_MATCHING, &invert, sizeof (invert));
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
assert (rc == 0);
|
zmq_setsockopt (pub, ZMQ_INVERT_MATCHING, &invert, sizeof (invert)));
|
||||||
|
|
||||||
// ... on both sides, otherwise the SUB socket will filter the messages out
|
// ... on both sides, otherwise the SUB socket will filter the messages out
|
||||||
rc = zmq_setsockopt (sub1, ZMQ_INVERT_MATCHING, &invert, sizeof (invert));
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
rc = zmq_setsockopt (sub2, ZMQ_INVERT_MATCHING, &invert, sizeof (invert));
|
zmq_setsockopt (sub1, ZMQ_INVERT_MATCHING, &invert, sizeof (invert)));
|
||||||
assert (rc == 0);
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
|
zmq_setsockopt (sub2, ZMQ_INVERT_MATCHING, &invert, sizeof (invert)));
|
||||||
|
|
||||||
// Send a message with the first prefix
|
// Send a message with the first prefix
|
||||||
rc = zmq_send_const (pub, PREFIX1, sizeof (PREFIX1), 0);
|
send_string_expect_success (pub, PREFIX1, 0);
|
||||||
assert (rc == sizeof (PREFIX1));
|
msleep (SETTLE_TIME);
|
||||||
|
|
||||||
// sub2 should receive it, but not sub1
|
// sub2 should receive it, but not sub1
|
||||||
rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT);
|
recv_string_expect_success (sub2, PREFIX1, ZMQ_DONTWAIT);
|
||||||
assert (rc == sizeof (PREFIX1));
|
|
||||||
|
|
||||||
rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT);
|
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT));
|
||||||
assert (rc == -1);
|
|
||||||
assert (errno == EAGAIN);
|
|
||||||
|
|
||||||
// Send a message with the second prefix
|
// Send a message with the second prefix
|
||||||
rc = zmq_send_const (pub, PREFIX2, sizeof (PREFIX2), 0);
|
send_string_expect_success (pub, PREFIX2, 0);
|
||||||
assert (rc == sizeof (PREFIX2));
|
msleep (SETTLE_TIME);
|
||||||
|
|
||||||
// sub1 should receive it, but not sub2
|
// sub1 should receive it, but not sub2
|
||||||
rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT);
|
recv_string_expect_success (sub1, PREFIX2, ZMQ_DONTWAIT);
|
||||||
assert (rc == sizeof (PREFIX2));
|
|
||||||
|
|
||||||
rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT);
|
|
||||||
assert (rc == -1);
|
|
||||||
assert (errno == EAGAIN);
|
|
||||||
|
|
||||||
|
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT));
|
||||||
|
|
||||||
// Clean up.
|
// Clean up.
|
||||||
rc = zmq_close (pub);
|
test_context_socket_close (pub);
|
||||||
assert (rc == 0);
|
test_context_socket_close (sub1);
|
||||||
rc = zmq_close (sub1);
|
test_context_socket_close (sub2);
|
||||||
assert (rc == 0);
|
}
|
||||||
rc = zmq_close (sub2);
|
|
||||||
assert (rc == 0);
|
int main ()
|
||||||
rc = zmq_ctx_term (ctx);
|
{
|
||||||
assert (rc == 0);
|
setup_test_environment ();
|
||||||
|
|
||||||
return 0;
|
UNITY_BEGIN ();
|
||||||
|
RUN_TEST (test);
|
||||||
|
return UNITY_END ();
|
||||||
}
|
}
|
||||||
|
@@ -28,69 +28,68 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "testutil.hpp"
|
#include "testutil.hpp"
|
||||||
|
#include "testutil_unity.hpp"
|
||||||
|
|
||||||
int main (void)
|
void setUp ()
|
||||||
{
|
{
|
||||||
setup_test_environment ();
|
setup_test_context ();
|
||||||
void *ctx = zmq_ctx_new ();
|
}
|
||||||
assert (ctx);
|
|
||||||
|
|
||||||
|
void tearDown ()
|
||||||
|
{
|
||||||
|
teardown_test_context ();
|
||||||
|
}
|
||||||
|
|
||||||
|
void test ()
|
||||||
|
{
|
||||||
// Create a publisher
|
// Create a publisher
|
||||||
void *pub = zmq_socket (ctx, ZMQ_PUB);
|
void *pub = test_context_socket (ZMQ_PUB);
|
||||||
assert (pub);
|
|
||||||
|
|
||||||
int hwm = 2000;
|
int hwm = 2000;
|
||||||
int rc = zmq_setsockopt (pub, ZMQ_SNDHWM, &hwm, 4);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SNDHWM, &hwm, 4));
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
rc = zmq_bind (pub, "inproc://soname");
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
// set pub socket options
|
// set pub socket options
|
||||||
int wait = 1;
|
int wait = 1;
|
||||||
rc = zmq_setsockopt (pub, ZMQ_XPUB_NODROP, &wait, 4);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_XPUB_NODROP, &wait, 4));
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
|
|
||||||
// Create a subscriber
|
// Create a subscriber
|
||||||
void *sub = zmq_socket (ctx, ZMQ_SUB);
|
void *sub = test_context_socket (ZMQ_SUB);
|
||||||
assert (sub);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
|
||||||
rc = zmq_connect (sub, "inproc://soname");
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
// Subscribe for all messages.
|
// Subscribe for all messages.
|
||||||
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0));
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
int hwmlimit = hwm - 1;
|
int hwmlimit = hwm - 1;
|
||||||
int send_count = 0;
|
int send_count = 0;
|
||||||
|
|
||||||
// Send an empty message
|
// Send an empty message
|
||||||
for (int i = 0; i < hwmlimit; i++) {
|
for (int i = 0; i < hwmlimit; i++) {
|
||||||
rc = zmq_send (pub, NULL, 0, 0);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (pub, NULL, 0, 0));
|
||||||
assert (rc == 0);
|
|
||||||
send_count++;
|
send_count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
int recv_count = 0;
|
int recv_count = 0;
|
||||||
do {
|
do {
|
||||||
// Receive the message in the subscriber
|
// Receive the message in the subscriber
|
||||||
rc = zmq_recv (sub, NULL, 0, ZMQ_DONTWAIT);
|
int rc = zmq_recv (sub, NULL, 0, ZMQ_DONTWAIT);
|
||||||
if (rc == -1)
|
if (rc == -1) {
|
||||||
assert (errno == EAGAIN);
|
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
|
||||||
else {
|
break;
|
||||||
assert (rc == 0);
|
} else {
|
||||||
|
TEST_ASSERT_EQUAL_INT (0, rc);
|
||||||
recv_count++;
|
recv_count++;
|
||||||
}
|
}
|
||||||
} while (rc == 0);
|
} while (true);
|
||||||
|
|
||||||
assert (send_count == recv_count);
|
TEST_ASSERT_EQUAL_INT (send_count, recv_count);
|
||||||
|
|
||||||
// Now test real blocking behavior
|
// Now test real blocking behavior
|
||||||
// Set a timeout, default is infinite
|
// Set a timeout, default is infinite
|
||||||
int timeout = 0;
|
int timeout = 0;
|
||||||
rc = zmq_setsockopt (pub, ZMQ_SNDTIMEO, &timeout, 4);
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SNDTIMEO, &timeout, 4));
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
send_count = 0;
|
send_count = 0;
|
||||||
recv_count = 0;
|
recv_count = 0;
|
||||||
@@ -99,19 +98,21 @@ int main (void)
|
|||||||
// Send an empty message until we get an error, which must be EAGAIN
|
// Send an empty message until we get an error, which must be EAGAIN
|
||||||
while (zmq_send (pub, "", 0, 0) == 0)
|
while (zmq_send (pub, "", 0, 0) == 0)
|
||||||
send_count++;
|
send_count++;
|
||||||
assert (errno == EAGAIN);
|
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
|
||||||
|
|
||||||
while (zmq_recv (sub, NULL, 0, ZMQ_DONTWAIT) == 0)
|
while (zmq_recv (sub, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||||
recv_count++;
|
recv_count++;
|
||||||
assert (send_count == recv_count);
|
TEST_ASSERT_EQUAL_INT (send_count, recv_count);
|
||||||
|
|
||||||
// Clean up.
|
// Clean up.
|
||||||
rc = zmq_close (pub);
|
test_context_socket_close (pub);
|
||||||
assert (rc == 0);
|
test_context_socket_close (sub);
|
||||||
rc = zmq_close (sub);
|
}
|
||||||
assert (rc == 0);
|
|
||||||
rc = zmq_ctx_term (ctx);
|
int main ()
|
||||||
assert (rc == 0);
|
{
|
||||||
|
setup_test_environment ();
|
||||||
return 0;
|
UNITY_BEGIN ();
|
||||||
|
RUN_TEST (test);
|
||||||
|
return UNITY_END ();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user