mirror of
https://github.com/zeromq/libzmq.git
synced 2025-02-28 18:35:41 +01:00
Merge pull request #3813 from sigiesec/code-style
Improve code style, remove duplications in test code
This commit is contained in:
commit
e7f802d1ac
@ -53,15 +53,15 @@ namespace zmq
|
||||
template <int ID = 0> class array_item_t
|
||||
{
|
||||
public:
|
||||
inline array_item_t () : _array_index (-1) {}
|
||||
array_item_t () : _array_index (-1) {}
|
||||
|
||||
// The destructor doesn't have to be virtual. It is made virtual
|
||||
// just to keep ICC and code checking tools from complaining.
|
||||
inline virtual ~array_item_t () ZMQ_DEFAULT;
|
||||
virtual ~array_item_t () ZMQ_DEFAULT;
|
||||
|
||||
inline void set_array_index (int index_) { _array_index = index_; }
|
||||
void set_array_index (int index_) { _array_index = index_; }
|
||||
|
||||
inline int get_array_index () const { return _array_index; }
|
||||
int get_array_index () const { return _array_index; }
|
||||
|
||||
private:
|
||||
int _array_index;
|
||||
@ -78,15 +78,15 @@ template <typename T, int ID = 0> class array_t
|
||||
public:
|
||||
typedef typename std::vector<T *>::size_type size_type;
|
||||
|
||||
inline array_t () ZMQ_DEFAULT;
|
||||
array_t () ZMQ_DEFAULT;
|
||||
|
||||
inline size_type size () { return _items.size (); }
|
||||
size_type size () { return _items.size (); }
|
||||
|
||||
inline bool empty () { return _items.empty (); }
|
||||
bool empty () { return _items.empty (); }
|
||||
|
||||
inline T *&operator[] (size_type index_) { return _items[index_]; }
|
||||
T *&operator[] (size_type index_) { return _items[index_]; }
|
||||
|
||||
inline void push_back (T *item_)
|
||||
void push_back (T *item_)
|
||||
{
|
||||
if (item_)
|
||||
static_cast<item_t *> (item_)->set_array_index (
|
||||
@ -94,12 +94,12 @@ template <typename T, int ID = 0> class array_t
|
||||
_items.push_back (item_);
|
||||
}
|
||||
|
||||
inline void erase (T *item_)
|
||||
void erase (T *item_)
|
||||
{
|
||||
erase (static_cast<item_t *> (item_)->get_array_index ());
|
||||
}
|
||||
|
||||
inline void erase (size_type index_)
|
||||
void erase (size_type index_)
|
||||
{
|
||||
if (_items.empty ())
|
||||
return;
|
||||
@ -110,7 +110,7 @@ template <typename T, int ID = 0> class array_t
|
||||
_items.pop_back ();
|
||||
}
|
||||
|
||||
inline void swap (size_type index1_, size_type index2_)
|
||||
void swap (size_type index1_, size_type index2_)
|
||||
{
|
||||
if (_items[index1_])
|
||||
static_cast<item_t *> (_items[index1_])
|
||||
@ -121,9 +121,9 @@ template <typename T, int ID = 0> class array_t
|
||||
std::swap (_items[index1_], _items[index2_]);
|
||||
}
|
||||
|
||||
inline void clear () { _items.clear (); }
|
||||
void clear () { _items.clear (); }
|
||||
|
||||
static inline size_type index (T *item_)
|
||||
static size_type index (T *item_)
|
||||
{
|
||||
return static_cast<size_type> (
|
||||
static_cast<item_t *> (item_)->get_array_index ());
|
||||
|
@ -91,16 +91,13 @@ class atomic_counter_t
|
||||
public:
|
||||
typedef uint32_t integer_t;
|
||||
|
||||
inline atomic_counter_t (integer_t value_ = 0) ZMQ_NOEXCEPT
|
||||
: _value (value_)
|
||||
{
|
||||
}
|
||||
atomic_counter_t (integer_t value_ = 0) ZMQ_NOEXCEPT : _value (value_) {}
|
||||
|
||||
// Set counter _value (not thread-safe).
|
||||
inline void set (integer_t value_) ZMQ_NOEXCEPT { _value = value_; }
|
||||
void set (integer_t value_) ZMQ_NOEXCEPT { _value = value_; }
|
||||
|
||||
// Atomic addition. Returns the old _value.
|
||||
inline integer_t add (integer_t increment_) ZMQ_NOEXCEPT
|
||||
integer_t add (integer_t increment_) ZMQ_NOEXCEPT
|
||||
{
|
||||
integer_t old_value;
|
||||
|
||||
@ -145,7 +142,7 @@ class atomic_counter_t
|
||||
}
|
||||
|
||||
// Atomic subtraction. Returns false if the counter drops to zero.
|
||||
inline bool sub (integer_t decrement_) ZMQ_NOEXCEPT
|
||||
bool sub (integer_t decrement_) ZMQ_NOEXCEPT
|
||||
{
|
||||
#if defined ZMQ_ATOMIC_COUNTER_WINDOWS
|
||||
LONG delta = -((LONG) decrement_);
|
||||
@ -200,7 +197,7 @@ class atomic_counter_t
|
||||
#endif
|
||||
}
|
||||
|
||||
inline integer_t get () const ZMQ_NOEXCEPT { return _value; }
|
||||
integer_t get () const ZMQ_NOEXCEPT { return _value; }
|
||||
|
||||
private:
|
||||
#if defined ZMQ_ATOMIC_COUNTER_CXX11
|
||||
|
@ -178,16 +178,16 @@ template <typename T> class atomic_ptr_t
|
||||
{
|
||||
public:
|
||||
// Initialise atomic pointer
|
||||
inline atomic_ptr_t () ZMQ_NOEXCEPT { _ptr = NULL; }
|
||||
atomic_ptr_t () ZMQ_NOEXCEPT { _ptr = NULL; }
|
||||
|
||||
// Set value of atomic pointer in a non-threadsafe way
|
||||
// Use this function only when you are sure that at most one
|
||||
// thread is accessing the pointer at the moment.
|
||||
inline void set (T *ptr_) ZMQ_NOEXCEPT { _ptr = ptr_; }
|
||||
void set (T *ptr_) ZMQ_NOEXCEPT { _ptr = ptr_; }
|
||||
|
||||
// Perform atomic 'exchange pointers' operation. Pointer is set
|
||||
// to the 'val_' value. Old value is returned.
|
||||
inline T *xchg (T *val_) ZMQ_NOEXCEPT
|
||||
T *xchg (T *val_) ZMQ_NOEXCEPT
|
||||
{
|
||||
#if defined ZMQ_ATOMIC_PTR_CXX11
|
||||
return _ptr.exchange (val_, std::memory_order_acq_rel);
|
||||
@ -205,7 +205,7 @@ template <typename T> class atomic_ptr_t
|
||||
// The pointer is compared to 'cmp' argument and if they are
|
||||
// equal, its value is set to 'val_'. Old value of the pointer
|
||||
// is returned.
|
||||
inline T *cas (T *cmp_, T *val_) ZMQ_NOEXCEPT
|
||||
T *cas (T *cmp_, T *val_) ZMQ_NOEXCEPT
|
||||
{
|
||||
#if defined ZMQ_ATOMIC_PTR_CXX11
|
||||
_ptr.compare_exchange_strong (cmp_, val_, std::memory_order_acq_rel);
|
||||
|
@ -45,20 +45,20 @@ class client_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
client_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~client_t () ZMQ_FINAL;
|
||||
~client_t ();
|
||||
|
||||
protected:
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
bool xhas_out ();
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
// Messages are fair-queued from inbound pipes. And load-balanced to
|
||||
|
@ -101,9 +101,9 @@ namespace zmq
|
||||
class condition_variable_t
|
||||
{
|
||||
public:
|
||||
inline condition_variable_t () ZMQ_DEFAULT;
|
||||
condition_variable_t () ZMQ_DEFAULT;
|
||||
|
||||
inline int wait (mutex_t *mutex_, int timeout_)
|
||||
int wait (mutex_t *mutex_, int timeout_)
|
||||
{
|
||||
// this assumes that the mutex mutex_ has been locked by the caller
|
||||
int res = 0;
|
||||
@ -119,7 +119,7 @@ class condition_variable_t
|
||||
return res;
|
||||
}
|
||||
|
||||
inline void broadcast ()
|
||||
void broadcast ()
|
||||
{
|
||||
// this assumes that the mutex associated with _cv has been locked by the caller
|
||||
_cv.notify_all ();
|
||||
|
@ -49,13 +49,13 @@ class curve_server_t ZMQ_FINAL : public zap_client_common_handshake_t,
|
||||
curve_server_t (session_base_t *session_,
|
||||
const std::string &peer_address_,
|
||||
const options_t &options_);
|
||||
~curve_server_t () ZMQ_FINAL;
|
||||
~curve_server_t ();
|
||||
|
||||
// mechanism implementation
|
||||
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
|
||||
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
|
||||
int encode (msg_t *msg_) ZMQ_FINAL;
|
||||
int decode (msg_t *msg_) ZMQ_FINAL;
|
||||
int next_handshake_command (msg_t *msg_);
|
||||
int process_handshake_command (msg_t *msg_);
|
||||
int encode (msg_t *msg_);
|
||||
int decode (msg_t *msg_);
|
||||
|
||||
private:
|
||||
// Our secret key (s)
|
||||
|
@ -58,26 +58,21 @@ template <typename T> class dbuffer_t;
|
||||
template <> class dbuffer_t<msg_t>
|
||||
{
|
||||
public:
|
||||
inline dbuffer_t () :
|
||||
_back (&_storage[0]),
|
||||
_front (&_storage[1]),
|
||||
_has_msg (false)
|
||||
dbuffer_t () : _back (&_storage[0]), _front (&_storage[1]), _has_msg (false)
|
||||
{
|
||||
_back->init ();
|
||||
_front->init ();
|
||||
}
|
||||
|
||||
inline ~dbuffer_t ()
|
||||
~dbuffer_t ()
|
||||
{
|
||||
_back->close ();
|
||||
_front->close ();
|
||||
}
|
||||
|
||||
inline void write (const msg_t &value_)
|
||||
void write (const msg_t &value_)
|
||||
{
|
||||
msg_t &xvalue = const_cast<msg_t &> (value_);
|
||||
|
||||
zmq_assert (xvalue.check ());
|
||||
zmq_assert (value_.check ());
|
||||
*_back = value_;
|
||||
|
||||
zmq_assert (_back->check ());
|
||||
@ -90,7 +85,7 @@ template <> class dbuffer_t<msg_t>
|
||||
}
|
||||
}
|
||||
|
||||
inline bool read (msg_t *value_)
|
||||
bool read (msg_t *value_)
|
||||
{
|
||||
if (!value_)
|
||||
return false;
|
||||
@ -111,14 +106,14 @@ template <> class dbuffer_t<msg_t>
|
||||
}
|
||||
|
||||
|
||||
inline bool check_read ()
|
||||
bool check_read ()
|
||||
{
|
||||
scoped_lock_t lock (_sync);
|
||||
|
||||
return _has_msg;
|
||||
}
|
||||
|
||||
inline bool probe (bool (*fn_) (const msg_t &))
|
||||
bool probe (bool (*fn_) (const msg_t &))
|
||||
{
|
||||
scoped_lock_t lock (_sync);
|
||||
return (*fn_) (*_front);
|
||||
|
@ -45,19 +45,19 @@ class dgram_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
dgram_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~dgram_t () ZMQ_FINAL;
|
||||
~dgram_t ();
|
||||
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
bool xhas_out ();
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
zmq::pipe_t *_pipe;
|
||||
|
32
src/dish.hpp
32
src/dish.hpp
@ -48,23 +48,23 @@ class dish_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
dish_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~dish_t () ZMQ_FINAL;
|
||||
~dish_t ();
|
||||
|
||||
protected:
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xhiccuped (pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
int xjoin (const char *group_) ZMQ_FINAL;
|
||||
int xleave (const char *group_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
bool xhas_out ();
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
void xhiccuped (pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
int xjoin (const char *group_);
|
||||
int xleave (const char *group_);
|
||||
|
||||
private:
|
||||
int xxrecv (zmq::msg_t *msg_);
|
||||
@ -98,12 +98,12 @@ class dish_session_t ZMQ_FINAL : public session_base_t
|
||||
zmq::socket_base_t *socket_,
|
||||
const options_t &options_,
|
||||
address_t *addr_);
|
||||
~dish_session_t () ZMQ_FINAL;
|
||||
~dish_session_t ();
|
||||
|
||||
// Overrides of the functions from session_base_t.
|
||||
int push_msg (msg_t *msg_) ZMQ_FINAL;
|
||||
int pull_msg (msg_t *msg_) ZMQ_FINAL;
|
||||
void reset () ZMQ_FINAL;
|
||||
int push_msg (msg_t *msg_);
|
||||
int pull_msg (msg_t *msg_);
|
||||
void reset ();
|
||||
|
||||
private:
|
||||
enum
|
||||
|
@ -54,7 +54,7 @@ namespace zmq
|
||||
template <typename T> class encoder_base_t : public i_encoder
|
||||
{
|
||||
public:
|
||||
inline explicit encoder_base_t (size_t bufsize_) :
|
||||
explicit encoder_base_t (size_t bufsize_) :
|
||||
_write_pos (0),
|
||||
_to_write (0),
|
||||
_next (NULL),
|
||||
@ -66,12 +66,12 @@ template <typename T> class encoder_base_t : public i_encoder
|
||||
alloc_assert (_buf);
|
||||
}
|
||||
|
||||
inline ~encoder_base_t () ZMQ_OVERRIDE { free (_buf); }
|
||||
~encoder_base_t () ZMQ_OVERRIDE { free (_buf); }
|
||||
|
||||
// The function returns a batch of binary data. The data
|
||||
// are filled to a supplied buffer. If no buffer is supplied (data_
|
||||
// points to NULL) decoder object will provide buffer of its own.
|
||||
inline size_t encode (unsigned char **data_, size_t size_) ZMQ_FINAL
|
||||
size_t encode (unsigned char **data_, size_t size_) ZMQ_FINAL
|
||||
{
|
||||
unsigned char *buffer = !*data_ ? _buf : *data_;
|
||||
const size_t buffersize = !*data_ ? _buf_size : size_;
|
||||
@ -139,10 +139,10 @@ template <typename T> class encoder_base_t : public i_encoder
|
||||
|
||||
// This function should be called from derived class to write the data
|
||||
// to the buffer and schedule next state machine action.
|
||||
inline void next_step (void *write_pos_,
|
||||
size_t to_write_,
|
||||
step_t next_,
|
||||
bool new_msg_flag_)
|
||||
void next_step (void *write_pos_,
|
||||
size_t to_write_,
|
||||
step_t next_,
|
||||
bool new_msg_flag_)
|
||||
{
|
||||
_write_pos = static_cast<unsigned char *> (write_pos_);
|
||||
_to_write = to_write_;
|
||||
|
@ -189,8 +189,8 @@ void zmq::epoll_t::loop ()
|
||||
}
|
||||
|
||||
for (int i = 0; i < n; i++) {
|
||||
poll_entry_t *pe =
|
||||
(static_cast<poll_entry_t *> (ev_buf[i].data.ptr));
|
||||
const poll_entry_t *const pe =
|
||||
static_cast<const poll_entry_t *> (ev_buf[i].data.ptr);
|
||||
|
||||
if (pe->fd == retired_fd)
|
||||
continue;
|
||||
|
@ -87,7 +87,7 @@ class epoll_t ZMQ_FINAL : public worker_poller_base_t
|
||||
#endif
|
||||
|
||||
// Main event loop.
|
||||
void loop () ZMQ_FINAL;
|
||||
void loop ();
|
||||
|
||||
// Main epoll file descriptor
|
||||
epoll_fd_t _epoll_fd;
|
||||
|
@ -43,17 +43,17 @@ class gather_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
gather_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~gather_t () ZMQ_FINAL;
|
||||
~gather_t ();
|
||||
|
||||
protected:
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
// Fair queueing object for inbound pipes.
|
||||
|
@ -50,7 +50,7 @@ class io_thread_t ZMQ_FINAL : public object_t, public i_poll_events
|
||||
|
||||
// Clean-up. If the thread was started, it's necessary to call 'stop'
|
||||
// before invoking destructor. Otherwise the destructor would hang up.
|
||||
~io_thread_t () ZMQ_FINAL;
|
||||
~io_thread_t ();
|
||||
|
||||
// Launch the physical thread.
|
||||
void start ();
|
||||
@ -62,15 +62,15 @@ class io_thread_t ZMQ_FINAL : public object_t, public i_poll_events
|
||||
mailbox_t *get_mailbox ();
|
||||
|
||||
// i_poll_events implementation.
|
||||
void in_event () ZMQ_FINAL;
|
||||
void out_event () ZMQ_FINAL;
|
||||
void timer_event (int id_) ZMQ_FINAL;
|
||||
void in_event ();
|
||||
void out_event ();
|
||||
void timer_event (int id_);
|
||||
|
||||
// Used by io_objects to retrieve the associated poller object.
|
||||
poller_t *get_poller () const;
|
||||
|
||||
// Command handlers.
|
||||
void process_stop () ZMQ_FINAL;
|
||||
void process_stop ();
|
||||
|
||||
// Returns load experienced by the I/O thread.
|
||||
int get_load () const;
|
||||
|
@ -859,7 +859,7 @@ int zmq::create_ipc_wildcard_address (std::string &path_, std::string &file_)
|
||||
// the socket directory there.
|
||||
const char **tmp_env = tmp_env_vars;
|
||||
while (tmp_path.empty () && *tmp_env != 0) {
|
||||
char *tmpdir = getenv (*tmp_env);
|
||||
const char *const tmpdir = getenv (*tmp_env);
|
||||
struct stat statbuf;
|
||||
|
||||
// Confirm it is actually a directory before trying to use
|
||||
|
@ -540,7 +540,7 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_)
|
||||
|
||||
// Find the corresponding network interface.
|
||||
bool found = false;
|
||||
for (ifaddrs *ifp = ifa; ifp != NULL; ifp = ifp->ifa_next) {
|
||||
for (const ifaddrs *ifp = ifa; ifp != NULL; ifp = ifp->ifa_next) {
|
||||
if (ifp->ifa_addr == NULL)
|
||||
continue;
|
||||
|
||||
|
@ -50,10 +50,10 @@ class ipc_connecter_t ZMQ_FINAL : public stream_connecter_base_t
|
||||
|
||||
private:
|
||||
// Handlers for I/O events.
|
||||
void out_event () ZMQ_FINAL;
|
||||
void out_event ();
|
||||
|
||||
// Internal function to start the actual connection establishment.
|
||||
void start_connecting () ZMQ_FINAL;
|
||||
void start_connecting ();
|
||||
|
||||
// Open IPC connecting socket. Returns -1 in case of error,
|
||||
// 0 if connect was successful immediately. Returns -1 with
|
||||
|
@ -248,8 +248,8 @@ bool zmq::ipc_listener_t::filter (fd_t sock_)
|
||||
!= options.ipc_pid_accept_filters.end ())
|
||||
return true;
|
||||
|
||||
struct passwd *pw;
|
||||
struct group *gr;
|
||||
const struct passwd *pw;
|
||||
const struct group *gr;
|
||||
|
||||
if (!(pw = getpwuid (cred.uid)))
|
||||
return false;
|
||||
|
@ -50,12 +50,11 @@ class ipc_listener_t ZMQ_FINAL : public stream_listener_base_t
|
||||
int set_local_address (const char *addr_);
|
||||
|
||||
protected:
|
||||
std::string get_socket_name (fd_t fd_,
|
||||
socket_end_t socket_end_) const ZMQ_FINAL;
|
||||
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
|
||||
|
||||
private:
|
||||
// Handlers for I/O events.
|
||||
void in_event () ZMQ_FINAL;
|
||||
void in_event ();
|
||||
|
||||
// Filter new connections if the OS provides a mechanism to get
|
||||
// the credentials of the peer process. Called from accept().
|
||||
@ -63,7 +62,7 @@ class ipc_listener_t ZMQ_FINAL : public stream_listener_base_t
|
||||
bool filter (fd_t sock_);
|
||||
#endif
|
||||
|
||||
int close () ZMQ_FINAL;
|
||||
int close ();
|
||||
|
||||
// Accept the new connection. Returns the file descriptor of the
|
||||
// newly created connection. The function may return retired_fd
|
||||
|
@ -46,11 +46,11 @@ class mailbox_t ZMQ_FINAL : public i_mailbox
|
||||
{
|
||||
public:
|
||||
mailbox_t ();
|
||||
~mailbox_t () ZMQ_FINAL;
|
||||
~mailbox_t ();
|
||||
|
||||
fd_t get_fd () const;
|
||||
void send (const command_t &cmd_) ZMQ_FINAL;
|
||||
int recv (command_t *cmd_, int timeout_) ZMQ_FINAL;
|
||||
void send (const command_t &cmd_);
|
||||
int recv (command_t *cmd_, int timeout_);
|
||||
|
||||
bool valid () const;
|
||||
|
||||
|
@ -48,10 +48,10 @@ class mailbox_safe_t ZMQ_FINAL : public i_mailbox
|
||||
{
|
||||
public:
|
||||
mailbox_safe_t (mutex_t *sync_);
|
||||
~mailbox_safe_t () ZMQ_FINAL;
|
||||
~mailbox_safe_t ();
|
||||
|
||||
void send (const command_t &cmd_) ZMQ_FINAL;
|
||||
int recv (command_t *cmd_, int timeout_) ZMQ_FINAL;
|
||||
void send (const command_t &cmd_);
|
||||
int recv (command_t *cmd_, int timeout_);
|
||||
|
||||
// Add signaler to mailbox which will be called when a message is ready
|
||||
void add_signaler (signaler_t *signaler_);
|
||||
|
@ -39,8 +39,7 @@ class msg_t;
|
||||
class mechanism_base_t : public mechanism_t
|
||||
{
|
||||
protected:
|
||||
mechanism_base_t (session_base_t *const session_,
|
||||
const options_t &options_);
|
||||
mechanism_base_t (session_base_t *session_, const options_t &options_);
|
||||
|
||||
session_base_t *const session;
|
||||
|
||||
|
@ -131,11 +131,12 @@ class msg_t
|
||||
|
||||
// These are called on each message received by the session_base class,
|
||||
// so get them inlined to avoid the overhead of 2 function calls per msg
|
||||
inline bool is_subscribe () const
|
||||
bool is_subscribe () const
|
||||
{
|
||||
return (_u.base.flags & CMD_TYPE_MASK) == subscribe;
|
||||
}
|
||||
inline bool is_cancel () const
|
||||
|
||||
bool is_cancel () const
|
||||
{
|
||||
return (_u.base.flags & CMD_TYPE_MASK) == cancel;
|
||||
}
|
||||
|
@ -44,20 +44,17 @@ namespace zmq
|
||||
class mutex_t
|
||||
{
|
||||
public:
|
||||
inline mutex_t () { InitializeCriticalSection (&_cs); }
|
||||
mutex_t () { InitializeCriticalSection (&_cs); }
|
||||
|
||||
inline ~mutex_t () { DeleteCriticalSection (&_cs); }
|
||||
~mutex_t () { DeleteCriticalSection (&_cs); }
|
||||
|
||||
inline void lock () { EnterCriticalSection (&_cs); }
|
||||
void lock () { EnterCriticalSection (&_cs); }
|
||||
|
||||
inline bool try_lock ()
|
||||
{
|
||||
return (TryEnterCriticalSection (&_cs)) ? true : false;
|
||||
}
|
||||
bool try_lock () { return (TryEnterCriticalSection (&_cs)) ? true : false; }
|
||||
|
||||
inline void unlock () { LeaveCriticalSection (&_cs); }
|
||||
void unlock () { LeaveCriticalSection (&_cs); }
|
||||
|
||||
inline CRITICAL_SECTION *get_cs () { return &_cs; }
|
||||
CRITICAL_SECTION *get_cs () { return &_cs; }
|
||||
|
||||
private:
|
||||
CRITICAL_SECTION _cs;
|
||||
|
@ -45,13 +45,13 @@ class null_mechanism_t ZMQ_FINAL : public zap_client_t
|
||||
null_mechanism_t (session_base_t *session_,
|
||||
const std::string &peer_address_,
|
||||
const options_t &options_);
|
||||
~null_mechanism_t () ZMQ_FINAL;
|
||||
~null_mechanism_t ();
|
||||
|
||||
// mechanism implementation
|
||||
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
|
||||
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
|
||||
int zap_msg_available () ZMQ_FINAL;
|
||||
status_t status () const ZMQ_FINAL;
|
||||
int next_handshake_command (msg_t *msg_);
|
||||
int process_handshake_command (msg_t *msg_);
|
||||
int zap_msg_available ();
|
||||
status_t status () const;
|
||||
|
||||
private:
|
||||
bool _ready_command_sent;
|
||||
|
@ -304,10 +304,10 @@ inline bool get_effective_conflate_option (const options_t &options)
|
||||
|| options.type == ZMQ_SUB);
|
||||
}
|
||||
|
||||
int do_getsockopt (void *const optval_,
|
||||
size_t *const optvallen_,
|
||||
int do_getsockopt (void *optval_,
|
||||
size_t *optvallen_,
|
||||
const void *value_,
|
||||
const size_t value_len_);
|
||||
size_t value_len_);
|
||||
|
||||
template <typename T>
|
||||
int do_getsockopt (void *const optval_, size_t *const optvallen_, T value_)
|
||||
@ -319,17 +319,17 @@ int do_getsockopt (void *const optval_, size_t *const optvallen_, T value_)
|
||||
return do_getsockopt (optval_, optvallen_, &value_, sizeof (T));
|
||||
}
|
||||
|
||||
int do_getsockopt (void *const optval_,
|
||||
size_t *const optvallen_,
|
||||
int do_getsockopt (void *optval_,
|
||||
size_t *optvallen_,
|
||||
const std::string &value_);
|
||||
|
||||
int do_setsockopt_int_as_bool_strict (const void *const optval_,
|
||||
const size_t optvallen_,
|
||||
bool *const out_value_);
|
||||
int do_setsockopt_int_as_bool_strict (const void *optval_,
|
||||
size_t optvallen_,
|
||||
bool *out_value_);
|
||||
|
||||
int do_setsockopt_int_as_bool_relaxed (const void *const optval_,
|
||||
const size_t optvallen_,
|
||||
bool *const out_value_);
|
||||
int do_setsockopt_int_as_bool_relaxed (const void *optval_,
|
||||
size_t optvallen_,
|
||||
bool *out_value_);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
18
src/pair.hpp
18
src/pair.hpp
@ -45,19 +45,19 @@ class pair_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
pair_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~pair_t () ZMQ_FINAL;
|
||||
~pair_t ();
|
||||
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
bool xhas_out ();
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
zmq::pipe_t *_pipe;
|
||||
|
@ -40,13 +40,13 @@ class msg_t;
|
||||
class plain_client_t ZMQ_FINAL : public mechanism_base_t
|
||||
{
|
||||
public:
|
||||
plain_client_t (session_base_t *const session_, const options_t &options_);
|
||||
~plain_client_t () ZMQ_FINAL;
|
||||
plain_client_t (session_base_t *session_, const options_t &options_);
|
||||
~plain_client_t ();
|
||||
|
||||
// mechanism implementation
|
||||
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
|
||||
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
|
||||
status_t status () const ZMQ_FINAL;
|
||||
int next_handshake_command (msg_t *msg_);
|
||||
int process_handshake_command (msg_t *msg_);
|
||||
status_t status () const;
|
||||
|
||||
private:
|
||||
enum state_t
|
||||
|
@ -44,11 +44,11 @@ class plain_server_t ZMQ_FINAL : public zap_client_common_handshake_t
|
||||
plain_server_t (session_base_t *session_,
|
||||
const std::string &peer_address_,
|
||||
const options_t &options_);
|
||||
~plain_server_t () ZMQ_FINAL;
|
||||
~plain_server_t ();
|
||||
|
||||
// mechanism implementation
|
||||
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
|
||||
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
|
||||
int next_handshake_command (msg_t *msg_);
|
||||
int process_handshake_command (msg_t *msg_);
|
||||
|
||||
private:
|
||||
static void produce_welcome (msg_t *msg_);
|
||||
|
@ -105,10 +105,8 @@ template <typename T, size_t S> class resizable_fast_vector_t
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
typedef int timeout_t;
|
||||
|
||||
timeout_t compute_timeout (const bool first_pass_,
|
||||
const long timeout_,
|
||||
const uint64_t now_,
|
||||
const uint64_t end_);
|
||||
timeout_t
|
||||
compute_timeout (bool first_pass_, long timeout_, uint64_t now_, uint64_t end_);
|
||||
|
||||
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
||||
inline size_t valid_pollset_bytes (const fd_set &pollset_)
|
||||
|
@ -43,14 +43,14 @@ class pub_t ZMQ_FINAL : public xpub_t
|
||||
{
|
||||
public:
|
||||
pub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~pub_t () ZMQ_FINAL;
|
||||
~pub_t ();
|
||||
|
||||
// Implementations of virtual functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_ = false,
|
||||
bool locally_initiated_ = false) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
bool locally_initiated_ = false);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
|
||||
ZMQ_NON_COPYABLE_NOR_MOVABLE (pub_t)
|
||||
};
|
||||
|
12
src/pull.hpp
12
src/pull.hpp
@ -45,17 +45,17 @@ class pull_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
pull_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~pull_t () ZMQ_FINAL;
|
||||
~pull_t ();
|
||||
|
||||
protected:
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
// Fair queueing object for inbound pipes.
|
||||
|
12
src/push.hpp
12
src/push.hpp
@ -45,17 +45,17 @@ class push_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
push_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~push_t () ZMQ_FINAL;
|
||||
~push_t ();
|
||||
|
||||
protected:
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
bool xhas_out ();
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
// Load balancer managing the outbound pipes.
|
||||
|
@ -218,7 +218,7 @@ int zmq::radio_session_t::push_msg (msg_t *msg_)
|
||||
const size_t data_size = msg_->size ();
|
||||
|
||||
int group_length;
|
||||
char *group;
|
||||
const char *group;
|
||||
|
||||
msg_t join_leave_msg;
|
||||
int rc;
|
||||
|
@ -49,21 +49,20 @@ class radio_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
radio_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~radio_t () ZMQ_FINAL;
|
||||
~radio_t ();
|
||||
|
||||
// Implementations of virtual functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_ = false,
|
||||
bool locally_initiated_ = false) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
int
|
||||
xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
bool locally_initiated_ = false);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
bool xhas_out ();
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
// List of all subscriptions mapped to corresponding pipes.
|
||||
@ -91,12 +90,12 @@ class radio_session_t ZMQ_FINAL : public session_base_t
|
||||
zmq::socket_base_t *socket_,
|
||||
const options_t &options_,
|
||||
address_t *addr_);
|
||||
~radio_session_t () ZMQ_FINAL;
|
||||
~radio_session_t ();
|
||||
|
||||
// Overrides of the functions from session_base_t.
|
||||
int push_msg (msg_t *msg_) ZMQ_FINAL;
|
||||
int pull_msg (msg_t *msg_) ZMQ_FINAL;
|
||||
void reset () ZMQ_FINAL;
|
||||
int push_msg (msg_t *msg_);
|
||||
int pull_msg (msg_t *msg_);
|
||||
void reset ();
|
||||
|
||||
private:
|
||||
enum
|
||||
|
@ -43,19 +43,17 @@ class raw_decoder_t ZMQ_FINAL : public i_decoder
|
||||
{
|
||||
public:
|
||||
raw_decoder_t (size_t bufsize_);
|
||||
~raw_decoder_t () ZMQ_FINAL;
|
||||
~raw_decoder_t ();
|
||||
|
||||
// i_decoder interface.
|
||||
|
||||
void get_buffer (unsigned char **data_, size_t *size_) ZMQ_FINAL;
|
||||
void get_buffer (unsigned char **data_, size_t *size_);
|
||||
|
||||
int decode (const unsigned char *data_,
|
||||
size_t size_,
|
||||
size_t &bytes_used_) ZMQ_FINAL;
|
||||
int decode (const unsigned char *data_, size_t size_, size_t &bytes_used_);
|
||||
|
||||
msg_t *msg () ZMQ_FINAL { return &_in_progress; }
|
||||
msg_t *msg () { return &_in_progress; }
|
||||
|
||||
void resize_buffer (size_t) ZMQ_FINAL {}
|
||||
void resize_buffer (size_t) {}
|
||||
|
||||
private:
|
||||
msg_t _in_progress;
|
||||
|
@ -44,7 +44,7 @@ class raw_encoder_t ZMQ_FINAL : public encoder_base_t<raw_encoder_t>
|
||||
{
|
||||
public:
|
||||
raw_encoder_t (size_t bufsize_);
|
||||
~raw_encoder_t () ZMQ_FINAL;
|
||||
~raw_encoder_t ();
|
||||
|
||||
private:
|
||||
void raw_message_ready ();
|
||||
|
@ -60,12 +60,12 @@ class raw_engine_t ZMQ_FINAL : public stream_engine_base_t
|
||||
raw_engine_t (fd_t fd_,
|
||||
const options_t &options_,
|
||||
const endpoint_uri_pair_t &endpoint_uri_pair_);
|
||||
~raw_engine_t () ZMQ_FINAL;
|
||||
~raw_engine_t ();
|
||||
|
||||
protected:
|
||||
void error (error_reason_t reason_) ZMQ_FINAL;
|
||||
void plug_internal () ZMQ_FINAL;
|
||||
bool handshake () ZMQ_FINAL;
|
||||
void error (error_reason_t reason_);
|
||||
void plug_internal ();
|
||||
bool handshake ();
|
||||
|
||||
private:
|
||||
int push_raw_msg_to_session (msg_t *msg_);
|
||||
|
@ -44,7 +44,7 @@ class reaper_t ZMQ_FINAL : public object_t, public i_poll_events
|
||||
{
|
||||
public:
|
||||
reaper_t (zmq::ctx_t *ctx_, uint32_t tid_);
|
||||
~reaper_t () ZMQ_FINAL;
|
||||
~reaper_t ();
|
||||
|
||||
mailbox_t *get_mailbox ();
|
||||
|
||||
@ -52,15 +52,15 @@ class reaper_t ZMQ_FINAL : public object_t, public i_poll_events
|
||||
void stop ();
|
||||
|
||||
// i_poll_events implementation.
|
||||
void in_event () ZMQ_FINAL;
|
||||
void out_event () ZMQ_FINAL;
|
||||
void timer_event (int id_) ZMQ_FINAL;
|
||||
void in_event ();
|
||||
void out_event ();
|
||||
void timer_event (int id_);
|
||||
|
||||
private:
|
||||
// Command handlers.
|
||||
void process_stop () ZMQ_FINAL;
|
||||
void process_reap (zmq::socket_base_t *socket_) ZMQ_FINAL;
|
||||
void process_reaped () ZMQ_FINAL;
|
||||
void process_stop ();
|
||||
void process_reap (zmq::socket_base_t *socket_);
|
||||
void process_reaped ();
|
||||
|
||||
// Reaper thread accesses incoming commands via this mailbox.
|
||||
mailbox_t _mailbox;
|
||||
|
10
src/rep.hpp
10
src/rep.hpp
@ -43,13 +43,13 @@ class rep_t ZMQ_FINAL : public router_t
|
||||
{
|
||||
public:
|
||||
rep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~rep_t () ZMQ_FINAL;
|
||||
~rep_t ();
|
||||
|
||||
// Overrides of functions from socket_base_t.
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
bool xhas_out ();
|
||||
|
||||
private:
|
||||
// If true, we are in process of sending the reply. If false we are
|
||||
|
21
src/req.hpp
21
src/req.hpp
@ -44,16 +44,15 @@ class req_t ZMQ_FINAL : public dealer_t
|
||||
{
|
||||
public:
|
||||
req_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~req_t () ZMQ_FINAL;
|
||||
~req_t ();
|
||||
|
||||
// Overrides of functions from socket_base_t.
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
int
|
||||
xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
bool xhas_out ();
|
||||
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
protected:
|
||||
// Receive only from the pipe the request was sent to, discarding
|
||||
@ -95,11 +94,11 @@ class req_session_t ZMQ_FINAL : public session_base_t
|
||||
zmq::socket_base_t *socket_,
|
||||
const options_t &options_,
|
||||
address_t *addr_);
|
||||
~req_session_t () ZMQ_FINAL;
|
||||
~req_session_t ();
|
||||
|
||||
// Overrides of the functions from session_base_t.
|
||||
int push_msg (msg_t *msg_) ZMQ_FINAL;
|
||||
void reset () ZMQ_FINAL;
|
||||
int push_msg (msg_t *msg_);
|
||||
void reset ();
|
||||
|
||||
private:
|
||||
enum
|
||||
|
@ -487,7 +487,8 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
|
||||
|
||||
// Try to remove an existing routing id entry to allow the new
|
||||
// connection to take the routing id.
|
||||
out_pipe_t *existing_outpipe = lookup_out_pipe (routing_id);
|
||||
const out_pipe_t *const existing_outpipe =
|
||||
lookup_out_pipe (routing_id);
|
||||
|
||||
if (existing_outpipe) {
|
||||
if (!_handover)
|
||||
|
@ -45,17 +45,17 @@ class scatter_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
scatter_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~scatter_t () ZMQ_FINAL;
|
||||
~scatter_t ();
|
||||
|
||||
protected:
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
bool xhas_out ();
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
// Load balancer managing the outbound pipes.
|
||||
|
@ -49,19 +49,19 @@ class server_t ZMQ_FINAL : public socket_base_t
|
||||
{
|
||||
public:
|
||||
server_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~server_t () ZMQ_FINAL;
|
||||
~server_t ();
|
||||
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
bool xhas_out ();
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
// Fair queueing object for inbound pipes.
|
||||
|
@ -981,8 +981,9 @@ int zmq::socket_base_t::connect (const char *endpoint_uri_)
|
||||
LIBZMQ_DELETE (paddr);
|
||||
return -1;
|
||||
}
|
||||
sockaddr_tipc *saddr =
|
||||
(sockaddr_tipc *) paddr->resolved.tipc_addr->addr ();
|
||||
const sockaddr_tipc *const saddr =
|
||||
reinterpret_cast<const sockaddr_tipc *> (
|
||||
paddr->resolved.tipc_addr->addr ());
|
||||
// Cannot connect to random Port Identity
|
||||
if (saddr->addrtype == TIPC_ADDR_ID
|
||||
&& paddr->resolved.tipc_addr->is_random ()) {
|
||||
|
@ -80,7 +80,7 @@ class socket_poller_t
|
||||
|
||||
int wait (event_t *events_, int n_events_, long timeout_);
|
||||
|
||||
inline int size () const { return static_cast<int> (_items.size ()); };
|
||||
int size () const { return static_cast<int> (_items.size ()); };
|
||||
|
||||
// Return false if object is not a socket.
|
||||
bool check_tag () const;
|
||||
|
@ -52,7 +52,7 @@ class socks_connecter_t ZMQ_FINAL : public stream_connecter_base_t
|
||||
address_t *addr_,
|
||||
address_t *proxy_addr_,
|
||||
bool delayed_start_);
|
||||
~socks_connecter_t () ZMQ_FINAL;
|
||||
~socks_connecter_t ();
|
||||
|
||||
void set_auth_method_basic (const std::string &username,
|
||||
const std::string &password);
|
||||
@ -82,11 +82,11 @@ class socks_connecter_t ZMQ_FINAL : public stream_connecter_base_t
|
||||
};
|
||||
|
||||
// Handlers for I/O events.
|
||||
void in_event () ZMQ_FINAL;
|
||||
void out_event () ZMQ_FINAL;
|
||||
void in_event ();
|
||||
void out_event ();
|
||||
|
||||
// Internal function to start the actual connection establishment.
|
||||
void start_connecting () ZMQ_FINAL;
|
||||
void start_connecting ();
|
||||
|
||||
static int process_server_response (const socks_choice_t &response_);
|
||||
static int process_server_response (const socks_response_t &response_);
|
||||
|
@ -43,20 +43,19 @@ class stream_t ZMQ_FINAL : public routing_socket_base_t
|
||||
{
|
||||
public:
|
||||
stream_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~stream_t () ZMQ_FINAL;
|
||||
~stream_t ();
|
||||
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_in () ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
|
||||
int
|
||||
xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
|
||||
bool locally_initiated_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
bool xhas_out ();
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||
|
||||
private:
|
||||
// Generate peer's id and update lookup map
|
||||
|
@ -43,13 +43,12 @@ class sub_t ZMQ_FINAL : public xsub_t
|
||||
{
|
||||
public:
|
||||
sub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~sub_t () ZMQ_FINAL;
|
||||
~sub_t ();
|
||||
|
||||
protected:
|
||||
int
|
||||
xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
|
||||
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
|
||||
bool xhas_out () ZMQ_FINAL;
|
||||
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
bool xhas_out ();
|
||||
|
||||
ZMQ_NON_COPYABLE_NOR_MOVABLE (sub_t)
|
||||
};
|
||||
|
@ -66,7 +66,7 @@ int tcp_write (fd_t s_, const void *data_, size_t size_);
|
||||
// Zero indicates the peer has closed the connection.
|
||||
int tcp_read (fd_t s_, void *data_, size_t size_);
|
||||
|
||||
void tcp_tune_loopback_fast_path (const fd_t socket_);
|
||||
void tcp_tune_loopback_fast_path (fd_t socket_);
|
||||
|
||||
// Resolves the given address_ string, opens a socket and sets socket options
|
||||
// according to the passed options_. On success, returns the socket
|
||||
|
@ -87,8 +87,7 @@ class tcp_address_mask_t
|
||||
|
||||
int mask () const;
|
||||
|
||||
bool match_address (const struct sockaddr *ss_,
|
||||
const socklen_t ss_len_) const;
|
||||
bool match_address (const struct sockaddr *ss_, socklen_t ss_len_) const;
|
||||
|
||||
private:
|
||||
ip_addr_t _network_address;
|
||||
|
@ -46,7 +46,7 @@ class tcp_connecter_t ZMQ_FINAL : public stream_connecter_base_t
|
||||
const options_t &options_,
|
||||
address_t *addr_,
|
||||
bool delayed_start_);
|
||||
~tcp_connecter_t () ZMQ_FINAL;
|
||||
~tcp_connecter_t ();
|
||||
|
||||
private:
|
||||
// ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id.
|
||||
@ -56,14 +56,14 @@ class tcp_connecter_t ZMQ_FINAL : public stream_connecter_base_t
|
||||
};
|
||||
|
||||
// Handlers for incoming commands.
|
||||
void process_term (int linger_) ZMQ_FINAL;
|
||||
void process_term (int linger_);
|
||||
|
||||
// Handlers for I/O events.
|
||||
void out_event () ZMQ_FINAL;
|
||||
void timer_event (int id_) ZMQ_FINAL;
|
||||
void out_event ();
|
||||
void timer_event (int id_);
|
||||
|
||||
// Internal function to start the actual connection establishment.
|
||||
void start_connecting () ZMQ_FINAL;
|
||||
void start_connecting ();
|
||||
|
||||
// Internal function to add a connect timer
|
||||
void add_connect_timer ();
|
||||
|
@ -47,12 +47,11 @@ class tcp_listener_t ZMQ_FINAL : public stream_listener_base_t
|
||||
int set_local_address (const char *addr_);
|
||||
|
||||
protected:
|
||||
std::string get_socket_name (fd_t fd_,
|
||||
socket_end_t socket_end_) const ZMQ_FINAL;
|
||||
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
|
||||
|
||||
private:
|
||||
// Handlers for I/O events.
|
||||
void in_event () ZMQ_FINAL;
|
||||
void in_event ();
|
||||
|
||||
// Accept the new connection. Returns the file descriptor of the
|
||||
// newly created connection. The function may return retired_fd
|
||||
|
@ -53,7 +53,7 @@ typedef void(thread_fn) (void *);
|
||||
class thread_t
|
||||
{
|
||||
public:
|
||||
inline thread_t () :
|
||||
thread_t () :
|
||||
_tfn (NULL),
|
||||
_arg (NULL),
|
||||
_started (false),
|
||||
|
@ -93,7 +93,8 @@ int zmq::tipc_listener_t::set_local_address (const char *addr_)
|
||||
return -1;
|
||||
|
||||
// Cannot bind non-random Port Identity
|
||||
struct sockaddr_tipc *a = (sockaddr_tipc *) _address.addr ();
|
||||
const sockaddr_tipc *const a =
|
||||
reinterpret_cast<const sockaddr_tipc *> (_address.addr ());
|
||||
if (!_address.is_random () && a->addrtype == TIPC_ADDR_ID) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
|
@ -236,11 +236,11 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool zmq::trie_t::check (const unsigned char *data_, size_t size_)
|
||||
bool zmq::trie_t::check (const unsigned char *data_, size_t size_) const
|
||||
{
|
||||
// This function is on critical path. It deliberately doesn't use
|
||||
// recursion to get a bit better performance.
|
||||
trie_t *current = this;
|
||||
const trie_t *current = this;
|
||||
while (true) {
|
||||
// We've found a corresponding subscription!
|
||||
if (current->_refcnt)
|
||||
|
@ -52,7 +52,7 @@ class trie_t
|
||||
bool rm (unsigned char *prefix_, size_t size_);
|
||||
|
||||
// Check whether particular key is in the trie.
|
||||
bool check (const unsigned char *data_, size_t size_);
|
||||
bool check (const unsigned char *data_, size_t size_) const;
|
||||
|
||||
// Apply the function supplied to each subscription in the trie.
|
||||
void apply (void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
||||
|
@ -18,33 +18,32 @@ class udp_engine_t ZMQ_FINAL : public io_object_t, public i_engine
|
||||
{
|
||||
public:
|
||||
udp_engine_t (const options_t &options_);
|
||||
~udp_engine_t () ZMQ_FINAL;
|
||||
~udp_engine_t ();
|
||||
|
||||
int init (address_t *address_, bool send_, bool recv_);
|
||||
|
||||
// i_engine interface implementation.
|
||||
// Plug the engine to the session.
|
||||
void plug (zmq::io_thread_t *io_thread_,
|
||||
class session_base_t *session_) ZMQ_FINAL;
|
||||
void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_);
|
||||
|
||||
// Terminate and deallocate the engine. Note that 'detached'
|
||||
// events are not fired on termination.
|
||||
void terminate () ZMQ_FINAL;
|
||||
void terminate ();
|
||||
|
||||
// This method is called by the session to signalise that more
|
||||
// messages can be written to the pipe.
|
||||
bool restart_input () ZMQ_FINAL;
|
||||
bool restart_input ();
|
||||
|
||||
// This method is called by the session to signalise that there
|
||||
// are messages to send available.
|
||||
void restart_output () ZMQ_FINAL;
|
||||
void restart_output ();
|
||||
|
||||
void zap_msg_available () ZMQ_FINAL{};
|
||||
void zap_msg_available (){};
|
||||
|
||||
void in_event () ZMQ_FINAL;
|
||||
void out_event () ZMQ_FINAL;
|
||||
void in_event ();
|
||||
void out_event ();
|
||||
|
||||
const endpoint_uri_pair_t &get_endpoint () const ZMQ_FINAL;
|
||||
const endpoint_uri_pair_t &get_endpoint () const;
|
||||
|
||||
private:
|
||||
int resolve_raw_address (const char *name_, size_t length_);
|
||||
|
@ -40,9 +40,9 @@ class v1_decoder_t ZMQ_FINAL : public decoder_base_t<v1_decoder_t>
|
||||
{
|
||||
public:
|
||||
v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_);
|
||||
~v1_decoder_t () ZMQ_FINAL;
|
||||
~v1_decoder_t ();
|
||||
|
||||
msg_t *msg () ZMQ_FINAL { return &_in_progress; }
|
||||
msg_t *msg () { return &_in_progress; }
|
||||
|
||||
private:
|
||||
int one_byte_size_ready (unsigned char const *);
|
||||
|
@ -40,7 +40,7 @@ class v1_encoder_t ZMQ_FINAL : public encoder_base_t<v1_encoder_t>
|
||||
{
|
||||
public:
|
||||
v1_encoder_t (size_t bufsize_);
|
||||
~v1_encoder_t () ZMQ_FINAL;
|
||||
~v1_encoder_t ();
|
||||
|
||||
private:
|
||||
void size_ready ();
|
||||
|
@ -43,10 +43,10 @@ class v2_decoder_t ZMQ_FINAL
|
||||
{
|
||||
public:
|
||||
v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_, bool zero_copy_);
|
||||
~v2_decoder_t () ZMQ_FINAL;
|
||||
~v2_decoder_t ();
|
||||
|
||||
// i_decoder interface.
|
||||
msg_t *msg () ZMQ_FINAL { return &_in_progress; }
|
||||
msg_t *msg () { return &_in_progress; }
|
||||
|
||||
private:
|
||||
int flags_ready (unsigned char const *);
|
||||
|
@ -40,7 +40,7 @@ class v2_encoder_t ZMQ_FINAL : public encoder_base_t<v2_encoder_t>
|
||||
{
|
||||
public:
|
||||
v2_encoder_t (size_t bufsize_);
|
||||
~v2_encoder_t () ZMQ_FINAL;
|
||||
~v2_encoder_t ();
|
||||
|
||||
private:
|
||||
void size_ready ();
|
||||
|
@ -48,10 +48,10 @@ class ws_connecter_t ZMQ_FINAL : public stream_connecter_base_t
|
||||
bool delayed_start_,
|
||||
bool wss_,
|
||||
const std::string &tls_hostname_);
|
||||
~ws_connecter_t () ZMQ_FINAL;
|
||||
~ws_connecter_t ();
|
||||
|
||||
protected:
|
||||
void create_engine (fd_t fd, const std::string &local_address_) ZMQ_FINAL;
|
||||
void create_engine (fd_t fd, const std::string &local_address_);
|
||||
|
||||
private:
|
||||
// ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id.
|
||||
@ -61,14 +61,14 @@ class ws_connecter_t ZMQ_FINAL : public stream_connecter_base_t
|
||||
};
|
||||
|
||||
// Handlers for incoming commands.
|
||||
void process_term (int linger_) ZMQ_FINAL;
|
||||
void process_term (int linger_);
|
||||
|
||||
// Handlers for I/O events.
|
||||
void out_event () ZMQ_FINAL;
|
||||
void timer_event (int id_) ZMQ_FINAL;
|
||||
void out_event ();
|
||||
void timer_event (int id_);
|
||||
|
||||
// Internal function to start the actual connection establishment.
|
||||
void start_connecting () ZMQ_FINAL;
|
||||
void start_connecting ();
|
||||
|
||||
// Internal function to add a connect timer
|
||||
void add_connect_timer ();
|
||||
|
@ -47,10 +47,10 @@ class ws_decoder_t ZMQ_FINAL
|
||||
int64_t maxmsgsize_,
|
||||
bool zero_copy_,
|
||||
bool must_mask_);
|
||||
~ws_decoder_t () ZMQ_FINAL;
|
||||
~ws_decoder_t ();
|
||||
|
||||
// i_decoder interface.
|
||||
msg_t *msg () ZMQ_FINAL { return &_in_progress; }
|
||||
msg_t *msg () { return &_in_progress; }
|
||||
|
||||
private:
|
||||
int opcode_ready (unsigned char const *);
|
||||
|
@ -40,7 +40,7 @@ class ws_encoder_t ZMQ_FINAL : public encoder_base_t<ws_encoder_t>
|
||||
{
|
||||
public:
|
||||
ws_encoder_t (size_t bufsize_, bool must_mask_);
|
||||
~ws_encoder_t () ZMQ_FINAL;
|
||||
~ws_encoder_t ();
|
||||
|
||||
private:
|
||||
void size_ready ();
|
||||
|
@ -132,7 +132,7 @@ class ws_engine_t ZMQ_FINAL : public stream_engine_base_t
|
||||
const endpoint_uri_pair_t &endpoint_uri_pair_,
|
||||
const ws_address_t &address_,
|
||||
bool client_);
|
||||
~ws_engine_t () ZMQ_FINAL;
|
||||
~ws_engine_t ();
|
||||
|
||||
protected:
|
||||
int decode_and_push (msg_t *msg_);
|
||||
|
@ -48,19 +48,18 @@ class ws_listener_t ZMQ_FINAL : public stream_listener_base_t
|
||||
const options_t &options_,
|
||||
bool wss_);
|
||||
|
||||
~ws_listener_t () ZMQ_FINAL;
|
||||
~ws_listener_t ();
|
||||
|
||||
// Set address to listen on.
|
||||
int set_local_address (const char *addr_);
|
||||
|
||||
protected:
|
||||
std::string get_socket_name (fd_t fd_,
|
||||
socket_end_t socket_end_) const ZMQ_FINAL;
|
||||
void create_engine (fd_t fd) ZMQ_FINAL;
|
||||
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
|
||||
void create_engine (fd_t fd);
|
||||
|
||||
private:
|
||||
// Handlers for I/O events.
|
||||
void in_event () ZMQ_FINAL;
|
||||
void in_event ();
|
||||
|
||||
// Accept the new connection. Returns the file descriptor of the
|
||||
// newly created connection. The function may return retired_fd
|
||||
|
@ -47,7 +47,7 @@ template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
{
|
||||
public:
|
||||
// Initialises the pipe.
|
||||
inline ypipe_t ()
|
||||
ypipe_t ()
|
||||
{
|
||||
// Insert terminator element into the queue.
|
||||
_queue.push ();
|
||||
@ -71,7 +71,7 @@ template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
// set to true the item is assumed to be continued by items
|
||||
// subsequently written to the pipe. Incomplete items are never
|
||||
// flushed down the stream.
|
||||
inline void write (const T &value_, bool incomplete_) ZMQ_FINAL
|
||||
void write (const T &value_, bool incomplete_)
|
||||
{
|
||||
// Place the value to the queue, add new terminator element.
|
||||
_queue.back () = value_;
|
||||
@ -88,7 +88,7 @@ template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
|
||||
// Pop an incomplete item from the pipe. Returns true if such
|
||||
// item exists, false otherwise.
|
||||
inline bool unwrite (T *value_) ZMQ_FINAL
|
||||
bool unwrite (T *value_)
|
||||
{
|
||||
if (_f == &_queue.back ())
|
||||
return false;
|
||||
@ -100,7 +100,7 @@ template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
// Flush all the completed items into the pipe. Returns false if
|
||||
// the reader thread is sleeping. In that case, caller is obliged to
|
||||
// wake the reader up before using the pipe again.
|
||||
inline bool flush () ZMQ_FINAL
|
||||
bool flush ()
|
||||
{
|
||||
// If there are no un-flushed items, do nothing.
|
||||
if (_w == _f)
|
||||
@ -125,7 +125,7 @@ template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
}
|
||||
|
||||
// Check whether item is available for reading.
|
||||
inline bool check_read () ZMQ_FINAL
|
||||
bool check_read ()
|
||||
{
|
||||
// Was the value prefetched already? If so, return.
|
||||
if (&_queue.front () != _r && _r)
|
||||
@ -150,7 +150,7 @@ template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
|
||||
// Reads an item from the pipe. Returns false if there is no value.
|
||||
// available.
|
||||
inline bool read (T *value_) ZMQ_FINAL
|
||||
bool read (T *value_)
|
||||
{
|
||||
// Try to prefetch a value.
|
||||
if (!check_read ())
|
||||
@ -166,7 +166,7 @@ template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
// Applies the function fn to the first elemenent in the pipe
|
||||
// and returns the value returned by the fn.
|
||||
// The pipe mustn't be empty or the function crashes.
|
||||
inline bool probe (bool (*fn_) (const T &)) ZMQ_FINAL
|
||||
bool probe (bool (*fn_) (const T &))
|
||||
{
|
||||
const bool rc = check_read ();
|
||||
zmq_assert (rc);
|
||||
|
@ -47,7 +47,7 @@ template <typename T> class ypipe_conflate_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
{
|
||||
public:
|
||||
// Initialises the pipe.
|
||||
inline ypipe_conflate_t () : reader_awake (false) {}
|
||||
ypipe_conflate_t () : reader_awake (false) {}
|
||||
|
||||
// Following function (write) deliberately copies uninitialised data
|
||||
// when used with zmq_msg. Initialising the VSM body for
|
||||
@ -57,7 +57,7 @@ template <typename T> class ypipe_conflate_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
#pragma message save
|
||||
#pragma message disable(UNINIT)
|
||||
#endif
|
||||
inline void write (const T &value_, bool incomplete_) ZMQ_FINAL
|
||||
void write (const T &value_, bool incomplete_)
|
||||
{
|
||||
(void) incomplete_;
|
||||
|
||||
@ -69,16 +69,16 @@ template <typename T> class ypipe_conflate_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
#endif
|
||||
|
||||
// There are no incomplete items for conflate ypipe
|
||||
inline bool unwrite (T *) ZMQ_FINAL { return false; }
|
||||
bool unwrite (T *) { return false; }
|
||||
|
||||
// Flush is no-op for conflate ypipe. Reader asleep behaviour
|
||||
// is as of the usual ypipe.
|
||||
// Returns false if the reader thread is sleeping. In that case,
|
||||
// caller is obliged to wake the reader up before using the pipe again.
|
||||
inline bool flush () ZMQ_FINAL { return reader_awake; }
|
||||
bool flush () { return reader_awake; }
|
||||
|
||||
// Check whether item is available for reading.
|
||||
inline bool check_read () ZMQ_FINAL
|
||||
bool check_read ()
|
||||
{
|
||||
const bool res = dbuffer.check_read ();
|
||||
if (!res)
|
||||
@ -89,7 +89,7 @@ template <typename T> class ypipe_conflate_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
|
||||
// Reads an item from the pipe. Returns false if there is no value.
|
||||
// available.
|
||||
inline bool read (T *value_) ZMQ_FINAL
|
||||
bool read (T *value_)
|
||||
{
|
||||
if (!check_read ())
|
||||
return false;
|
||||
@ -100,10 +100,7 @@ template <typename T> class ypipe_conflate_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
// Applies the function fn to the first elemenent in the pipe
|
||||
// and returns the value returned by the fn.
|
||||
// The pipe mustn't be empty or the function crashes.
|
||||
inline bool probe (bool (*fn_) (const T &)) ZMQ_FINAL
|
||||
{
|
||||
return dbuffer.probe (fn_);
|
||||
}
|
||||
bool probe (bool (*fn_) (const T &)) { return dbuffer.probe (fn_); }
|
||||
|
||||
protected:
|
||||
dbuffer_t<T> dbuffer;
|
||||
|
@ -37,7 +37,7 @@ namespace zmq
|
||||
class zap_client_t : public virtual mechanism_base_t
|
||||
{
|
||||
public:
|
||||
zap_client_t (session_base_t *const session_,
|
||||
zap_client_t (session_base_t *session_,
|
||||
const std::string &peer_address_,
|
||||
const options_t &options_);
|
||||
|
||||
@ -77,7 +77,7 @@ class zap_client_common_handshake_t : public zap_client_t
|
||||
ready
|
||||
};
|
||||
|
||||
zap_client_common_handshake_t (session_base_t *const session_,
|
||||
zap_client_common_handshake_t (session_base_t *session_,
|
||||
const std::string &peer_address_,
|
||||
const options_t &options_,
|
||||
state_t zap_reply_ok_state_);
|
||||
|
@ -1135,8 +1135,8 @@ void *zmq_poller_new (void)
|
||||
int zmq_poller_destroy (void **poller_p_)
|
||||
{
|
||||
if (poller_p_) {
|
||||
zmq::socket_poller_t *const poller =
|
||||
static_cast<zmq::socket_poller_t *> (*poller_p_);
|
||||
const zmq::socket_poller_t *const poller =
|
||||
static_cast<const zmq::socket_poller_t *> (*poller_p_);
|
||||
if (poller && poller->check_tag ()) {
|
||||
delete poller;
|
||||
*poller_p_ = NULL;
|
||||
@ -1227,7 +1227,8 @@ int zmq_poller_modify (void *poller_, void *s_, short events_)
|
||||
|| -1 == check_events (events_))
|
||||
return -1;
|
||||
|
||||
zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
|
||||
const zmq::socket_base_t *const socket =
|
||||
static_cast<const zmq::socket_base_t *> (s_);
|
||||
|
||||
return (static_cast<zmq::socket_poller_t *> (poller_))
|
||||
->modify (socket, events_);
|
||||
|
@ -545,7 +545,8 @@ int zmq::zmtp_engine_t::process_command_message (msg_t *msg_)
|
||||
if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size)))
|
||||
return -1;
|
||||
|
||||
uint8_t *cmd_name = (static_cast<uint8_t *> (msg_->data ())) + 1;
|
||||
const uint8_t *const cmd_name =
|
||||
static_cast<const uint8_t *> (msg_->data ()) + 1;
|
||||
if (cmd_name_size == ping_name_size
|
||||
&& memcmp (cmd_name, "PING", cmd_name_size) == 0)
|
||||
msg_->set_flags (zmq::msg_t::ping);
|
||||
|
@ -65,18 +65,18 @@ class zmtp_engine_t ZMQ_FINAL : public stream_engine_base_t
|
||||
zmtp_engine_t (fd_t fd_,
|
||||
const options_t &options_,
|
||||
const endpoint_uri_pair_t &endpoint_uri_pair_);
|
||||
~zmtp_engine_t () ZMQ_FINAL;
|
||||
~zmtp_engine_t ();
|
||||
|
||||
protected:
|
||||
// Detects the protocol used by the peer.
|
||||
bool handshake () ZMQ_FINAL;
|
||||
bool handshake ();
|
||||
|
||||
void plug_internal () ZMQ_FINAL;
|
||||
void plug_internal ();
|
||||
|
||||
int process_command_message (msg_t *msg_) ZMQ_FINAL;
|
||||
int produce_ping_message (msg_t *msg_) ZMQ_FINAL;
|
||||
int process_heartbeat_message (msg_t *msg_) ZMQ_FINAL;
|
||||
int produce_pong_message (msg_t *msg_) ZMQ_FINAL;
|
||||
int process_command_message (msg_t *msg_);
|
||||
int produce_ping_message (msg_t *msg_);
|
||||
int process_heartbeat_message (msg_t *msg_);
|
||||
int produce_pong_message (msg_t *msg_);
|
||||
|
||||
private:
|
||||
// Receive the greeting from the peer.
|
||||
|
@ -123,10 +123,10 @@ void test_app_meta_reqrep ()
|
||||
|
||||
TEST_ASSERT_EQUAL_STRING ("hello", zmq_msg_gets (&msg, "X-hello"));
|
||||
TEST_ASSERT_EQUAL_STRING ("primary", zmq_msg_gets (&msg, "X-connection"));
|
||||
char *bindata = const_cast<char *> (zmq_msg_gets (&msg, "X-bin"));
|
||||
const char *const bindata = zmq_msg_gets (&msg, "X-bin");
|
||||
TEST_ASSERT_NOT_NULL (bindata);
|
||||
uint8_t rawdata[4];
|
||||
void *ret = zmq_z85_decode (rawdata, bindata);
|
||||
const uint8_t *const ret = zmq_z85_decode (rawdata, bindata);
|
||||
TEST_ASSERT_NOT_NULL (ret);
|
||||
TEST_ASSERT_EQUAL_UINT8 (0, rawdata[0]);
|
||||
TEST_ASSERT_EQUAL_UINT8 (1, rawdata[1]);
|
||||
|
@ -94,11 +94,12 @@ void test__zmq_z85_encode__zmq_z85_decode__roundtrip (
|
||||
const uint8_t (&test_data_)[SIZE])
|
||||
{
|
||||
char test_data_z85[SIZE * 5 / 4 + 1];
|
||||
char *res1 = zmq_z85_encode (test_data_z85, test_data_, SIZE);
|
||||
const char *const res1 = zmq_z85_encode (test_data_z85, test_data_, SIZE);
|
||||
TEST_ASSERT_NOT_NULL (res1);
|
||||
|
||||
uint8_t test_data_decoded[SIZE];
|
||||
uint8_t *res2 = zmq_z85_decode (test_data_decoded, test_data_z85);
|
||||
const uint8_t *const res2 =
|
||||
zmq_z85_decode (test_data_decoded, test_data_z85);
|
||||
TEST_ASSERT_NOT_NULL (res2);
|
||||
|
||||
TEST_ASSERT_EQUAL_UINT8_ARRAY (test_data_, test_data_decoded, SIZE);
|
||||
@ -111,11 +112,11 @@ void test__zmq_z85_decode__zmq_z85_encode__roundtrip (
|
||||
{
|
||||
const size_t decoded_size = (SIZE - 1) * 4 / 5;
|
||||
uint8_t test_data_decoded[decoded_size];
|
||||
uint8_t *res1 = zmq_z85_decode (test_data_decoded, test_data_);
|
||||
const uint8_t *const res1 = zmq_z85_decode (test_data_decoded, test_data_);
|
||||
TEST_ASSERT_NOT_NULL (res1);
|
||||
|
||||
char test_data_z85[SIZE];
|
||||
char *res2 =
|
||||
const char *const res2 =
|
||||
zmq_z85_encode (test_data_z85, test_data_decoded, decoded_size);
|
||||
TEST_ASSERT_NOT_NULL (res2);
|
||||
|
||||
|
@ -162,8 +162,8 @@ struct poller_test_data_t
|
||||
|
||||
void run_poller (void *data_)
|
||||
{
|
||||
struct poller_test_data_t *poller_test_data =
|
||||
static_cast<struct poller_test_data_t *> (data_);
|
||||
const poller_test_data_t *const poller_test_data =
|
||||
static_cast<const poller_test_data_t *> (data_);
|
||||
|
||||
void *socket =
|
||||
zmq_socket (poller_test_data->ctx, poller_test_data->socket_type);
|
||||
|
@ -68,7 +68,8 @@ void test_disconnect_inproc ()
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pub_socket, 0));
|
||||
char *buffer = static_cast<char *> (zmq_msg_data (&msg));
|
||||
const char *const buffer =
|
||||
static_cast<const char *> (zmq_msg_data (&msg));
|
||||
|
||||
if (buffer[0] == 0) {
|
||||
TEST_ASSERT_TRUE (isSubscribed);
|
||||
|
@ -56,8 +56,7 @@ void test_system_max ()
|
||||
|
||||
// System is out of resources, further calls to zmq_socket should return NULL
|
||||
for (unsigned int i = 0; i < 10; ++i) {
|
||||
void *socket = zmq_socket (get_test_context (), ZMQ_PAIR);
|
||||
TEST_ASSERT_NULL (socket);
|
||||
TEST_ASSERT_NULL (zmq_socket (get_test_context (), ZMQ_PAIR));
|
||||
}
|
||||
// Clean up.
|
||||
for (unsigned int i = 0; i < sockets.size (); ++i)
|
||||
@ -80,8 +79,7 @@ void test_zmq_default_max ()
|
||||
|
||||
// Further calls to zmq_socket should return NULL
|
||||
for (unsigned int i = 0; i < 10; ++i) {
|
||||
void *socket = zmq_socket (get_test_context (), ZMQ_PAIR);
|
||||
TEST_ASSERT_NULL (socket);
|
||||
TEST_ASSERT_NULL (zmq_socket (get_test_context (), ZMQ_PAIR));
|
||||
}
|
||||
|
||||
// Clean up
|
||||
|
@ -80,7 +80,7 @@ void *g_workers_pkts_out = NULL;
|
||||
|
||||
static void client_task (void *db_)
|
||||
{
|
||||
struct thread_data *databag = static_cast<struct thread_data *> (db_);
|
||||
const thread_data *const databag = static_cast<const thread_data *> (db_);
|
||||
// Endpoint socket gets random port to avoid test failing when port in use
|
||||
void *endpoint = zmq_socket (get_test_context (), ZMQ_PAIR);
|
||||
TEST_ASSERT_NOT_NULL (endpoint);
|
||||
|
@ -83,7 +83,8 @@ static void lower_hwm (void *skt_)
|
||||
|
||||
static void publisher_thread_main (void *pvoid_)
|
||||
{
|
||||
proxy_hwm_cfg_t *cfg = static_cast<proxy_hwm_cfg_t *> (pvoid_);
|
||||
const proxy_hwm_cfg_t *const cfg =
|
||||
static_cast<const proxy_hwm_cfg_t *> (pvoid_);
|
||||
|
||||
void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
|
||||
assert (pubsocket);
|
||||
@ -138,7 +139,8 @@ static void publisher_thread_main (void *pvoid_)
|
||||
|
||||
static void subscriber_thread_main (void *pvoid_)
|
||||
{
|
||||
proxy_hwm_cfg_t *cfg = static_cast<proxy_hwm_cfg_t *> (pvoid_);
|
||||
const proxy_hwm_cfg_t *const cfg =
|
||||
static_cast<const proxy_hwm_cfg_t *> (pvoid_);
|
||||
|
||||
void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
|
||||
assert (subsocket);
|
||||
@ -266,8 +268,8 @@ bool check_proxy_stats (void *control_proxy_)
|
||||
|
||||
static void proxy_stats_asker_thread_main (void *pvoid_)
|
||||
{
|
||||
proxy_hwm_cfg_t *cfg = static_cast<proxy_hwm_cfg_t *> (pvoid_);
|
||||
|
||||
const proxy_hwm_cfg_t *const cfg =
|
||||
static_cast<const proxy_hwm_cfg_t *> (pvoid_);
|
||||
|
||||
// CONTROL REQ
|
||||
|
||||
@ -318,10 +320,10 @@ static void proxy_stats_asker_thread_main (void *pvoid_)
|
||||
|
||||
static void proxy_thread_main (void *pvoid_)
|
||||
{
|
||||
proxy_hwm_cfg_t *cfg = static_cast<proxy_hwm_cfg_t *> (pvoid_);
|
||||
const proxy_hwm_cfg_t *const cfg =
|
||||
static_cast<const proxy_hwm_cfg_t *> (pvoid_);
|
||||
int rc;
|
||||
|
||||
|
||||
// FRONTEND SUB
|
||||
|
||||
void *frontend_xsub = zmq_socket (
|
||||
|
@ -350,7 +350,7 @@ static bool is_multicast_available (int ipv6_)
|
||||
|
||||
if (ipv6_) {
|
||||
struct ipv6_mreq mreq;
|
||||
struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
|
||||
const sockaddr_in6 *const mcast_ipv6 = &mcast.ipv6;
|
||||
|
||||
mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr;
|
||||
mreq.ipv6mr_interface = 0;
|
||||
@ -369,7 +369,7 @@ static bool is_multicast_available (int ipv6_)
|
||||
}
|
||||
} else {
|
||||
struct ip_mreq mreq;
|
||||
struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
|
||||
const sockaddr_in *const mcast_ipv4 = &mcast.ipv4;
|
||||
|
||||
mreq.imr_multiaddr = mcast_ipv4->sin_addr;
|
||||
mreq.imr_interface.s_addr = htonl (INADDR_ANY);
|
||||
|
@ -28,6 +28,7 @@
|
||||
*/
|
||||
|
||||
#include "testutil.hpp"
|
||||
#include "testutil_monitoring.hpp"
|
||||
#include "testutil_unity.hpp"
|
||||
#if defined(ZMQ_HAVE_WINDOWS)
|
||||
#include <winsock2.h>
|
||||
@ -178,42 +179,6 @@ void tearDown ()
|
||||
zmq_threadclose (zap_thread);
|
||||
}
|
||||
|
||||
// Read one event off the monitor socket; return value and address
|
||||
// by reference, if not null, and event number by value. Returns -1
|
||||
// in case of error.
|
||||
static int get_monitor_event (void *monitor_, int *value_, char **address_)
|
||||
{
|
||||
// First frame in message contains event number and value
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
if (zmq_msg_recv (&msg, monitor_, 0) == -1)
|
||||
return -1; // Interruped, presumably
|
||||
TEST_ASSERT_TRUE (zmq_msg_more (&msg));
|
||||
|
||||
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
|
||||
uint16_t event = *reinterpret_cast<uint16_t *> (data);
|
||||
if (value_)
|
||||
*value_ = *reinterpret_cast<uint32_t *> (data + 2);
|
||||
zmq_msg_close (&msg);
|
||||
|
||||
// Second frame in message contains event address
|
||||
zmq_msg_init (&msg);
|
||||
if (zmq_msg_recv (&msg, monitor_, 0) == -1)
|
||||
return -1; // Interruped, presumably
|
||||
TEST_ASSERT_FALSE (zmq_msg_more (&msg));
|
||||
|
||||
if (address_) {
|
||||
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
|
||||
size_t size = zmq_msg_size (&msg);
|
||||
*address_ = static_cast<char *> (malloc (size + 1));
|
||||
memcpy (*address_, data, size);
|
||||
*address_[size] = 0;
|
||||
}
|
||||
zmq_msg_close (&msg);
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
void test_valid_creds ()
|
||||
{
|
||||
void *client = test_context_socket (ZMQ_DEALER);
|
||||
|
@ -48,7 +48,7 @@ struct thread_data
|
||||
extern "C" {
|
||||
static void worker (void *data_)
|
||||
{
|
||||
struct thread_data *tdata = (struct thread_data *) data_;
|
||||
const thread_data *const tdata = static_cast<const thread_data *> (data_);
|
||||
|
||||
void *socket = zmq_socket (get_test_context (), ZMQ_SUB);
|
||||
|
||||
|
@ -42,8 +42,7 @@ void tearDown ()
|
||||
// tests all socket-related functions with a NULL socket argument
|
||||
void test_zmq_socket_null_context ()
|
||||
{
|
||||
void *s = zmq_socket (NULL, ZMQ_PAIR);
|
||||
TEST_ASSERT_NULL (s);
|
||||
TEST_ASSERT_NULL (zmq_socket (NULL, ZMQ_PAIR));
|
||||
TEST_ASSERT_EQUAL_INT (EFAULT, errno); // TODO use EINVAL instead?
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@
|
||||
*/
|
||||
|
||||
#include "testutil.hpp"
|
||||
#include "testutil_monitoring.hpp"
|
||||
#include "testutil_unity.hpp"
|
||||
|
||||
#include <stdlib.h>
|
||||
@ -35,40 +36,6 @@
|
||||
|
||||
SETUP_TEARDOWN_TESTCONTEXT
|
||||
|
||||
// Read one event off the monitor socket; return value and address
|
||||
// by reference, if not null, and event number by value. Returns -1
|
||||
// in case of error.
|
||||
|
||||
static int get_monitor_event (void *monitor_, int *value_, char **address_)
|
||||
{
|
||||
// First frame in message contains event number and value
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
if (zmq_msg_recv (&msg, monitor_, 0) == -1)
|
||||
return -1; // Interruped, presumably
|
||||
TEST_ASSERT_TRUE (zmq_msg_more (&msg));
|
||||
|
||||
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
|
||||
uint16_t event = *reinterpret_cast<uint16_t *> (data);
|
||||
if (value_)
|
||||
*value_ = *reinterpret_cast<uint32_t *> (data + 2);
|
||||
|
||||
// Second frame in message contains event address
|
||||
zmq_msg_init (&msg);
|
||||
if (zmq_msg_recv (&msg, monitor_, 0) == -1)
|
||||
return -1; // Interruped, presumably
|
||||
TEST_ASSERT_TRUE (!zmq_msg_more (&msg));
|
||||
|
||||
if (address_) {
|
||||
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
|
||||
size_t size = zmq_msg_size (&msg);
|
||||
*address_ = static_cast<char *> (malloc (size + 1));
|
||||
memcpy (*address_, data, size);
|
||||
*address_[size] = 0;
|
||||
}
|
||||
return event;
|
||||
}
|
||||
|
||||
static void test_stream_handshake_timeout_accept ()
|
||||
{
|
||||
char my_endpoint[MAX_SOCKET_STRING];
|
||||
|
@ -32,10 +32,32 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
static int
|
||||
receive_monitor_address (void *monitor_, char **address_, bool expect_more_)
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
|
||||
zmq_msg_init (&msg);
|
||||
if (zmq_msg_recv (&msg, monitor_, 0) == -1)
|
||||
return -1; // Interrupted, presumably
|
||||
TEST_ASSERT_EQUAL (expect_more_, zmq_msg_more (&msg));
|
||||
|
||||
if (address_) {
|
||||
const uint8_t *const data =
|
||||
static_cast<const uint8_t *> (zmq_msg_data (&msg));
|
||||
const size_t size = zmq_msg_size (&msg);
|
||||
*address_ = static_cast<char *> (malloc (size + 1));
|
||||
memcpy (*address_, data, size);
|
||||
(*address_)[size] = 0;
|
||||
}
|
||||
zmq_msg_close (&msg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Read one event off the monitor socket; return value and address
|
||||
// by reference, if not null, and event number by value. Returns -1
|
||||
// in case of error.
|
||||
|
||||
static int get_monitor_event_internal (void *monitor_,
|
||||
int *value_,
|
||||
char **address_,
|
||||
@ -56,17 +78,9 @@ static int get_monitor_event_internal (void *monitor_,
|
||||
memcpy (value_, data + 2, sizeof (uint32_t));
|
||||
|
||||
// Second frame in message contains event address
|
||||
zmq_msg_init (&msg);
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, monitor_, recv_flag_));
|
||||
TEST_ASSERT_FALSE (zmq_msg_more (&msg));
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
receive_monitor_address (monitor_, address_, false));
|
||||
|
||||
if (address_) {
|
||||
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
|
||||
size_t size = zmq_msg_size (&msg);
|
||||
*address_ = static_cast<char *> (malloc (size + 1));
|
||||
memcpy (*address_, data, size);
|
||||
(*address_)[size] = 0;
|
||||
}
|
||||
return event;
|
||||
}
|
||||
|
||||
@ -241,32 +255,13 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_,
|
||||
}
|
||||
|
||||
// Second-to-last frame in message contains local address
|
||||
zmq_msg_init (&msg);
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, monitor_, recv_flag_));
|
||||
TEST_ASSERT_TRUE (zmq_msg_more (&msg));
|
||||
|
||||
if (local_address_) {
|
||||
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
|
||||
size_t size = zmq_msg_size (&msg);
|
||||
*local_address_ = static_cast<char *> (malloc (size + 1));
|
||||
memcpy (*local_address_, data, size);
|
||||
(*local_address_)[size] = 0;
|
||||
}
|
||||
zmq_msg_close (&msg);
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
receive_monitor_address (monitor_, local_address_, true));
|
||||
|
||||
// Last frame in message contains remote address
|
||||
zmq_msg_init (&msg);
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, monitor_, recv_flag_));
|
||||
TEST_ASSERT_TRUE (!zmq_msg_more (&msg));
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
receive_monitor_address (monitor_, remote_address_, false));
|
||||
|
||||
if (remote_address_) {
|
||||
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
|
||||
size_t size = zmq_msg_size (&msg);
|
||||
*remote_address_ = static_cast<char *> (malloc (size + 1));
|
||||
memcpy (*remote_address_, data, size);
|
||||
(*remote_address_)[size] = 0;
|
||||
}
|
||||
zmq_msg_close (&msg);
|
||||
return event;
|
||||
}
|
||||
|
||||
|
@ -42,6 +42,9 @@ int get_monitor_event_with_timeout (void *monitor_,
|
||||
char **address_,
|
||||
int timeout_);
|
||||
|
||||
// Read one event off the monitor socket; return value and address
|
||||
// by reference, if not null, and event number by value. Returns -1
|
||||
// in case of error.
|
||||
int get_monitor_event (void *monitor_, int *value_, char **address_);
|
||||
|
||||
void expect_monitor_event (void *monitor_, int expected_event_);
|
||||
@ -72,4 +75,5 @@ void expect_monitor_event_v2 (void *monitor_,
|
||||
int64_t expected_event_,
|
||||
const char *expected_local_address_ = NULL,
|
||||
const char *expected_remote_address_ = NULL);
|
||||
|
||||
#endif
|
||||
|
@ -109,8 +109,8 @@ void socket_config_curve_server (void *server_, void *server_secret_)
|
||||
|
||||
void socket_config_curve_client (void *client_, void *data_)
|
||||
{
|
||||
curve_client_data_t *curve_client_data =
|
||||
static_cast<curve_client_data_t *> (data_);
|
||||
const curve_client_data_t *const curve_client_data =
|
||||
static_cast<const curve_client_data_t *> (data_);
|
||||
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
|
||||
client_, ZMQ_CURVE_SERVERKEY, curve_client_data->server_public, 41));
|
||||
|
Loading…
x
Reference in New Issue
Block a user