Merge pull request #3795 from sigiesec/final-override-analyze

Code style improvements
This commit is contained in:
Luca Boccassi
2020-01-27 11:42:57 +00:00
committed by GitHub
164 changed files with 1033 additions and 963 deletions

View File

@@ -420,3 +420,7 @@ CheckOptions:
# value: '0'
# - key: readability-simplify-boolean-expr.ChainedConditionalReturn
# value: '0'
- key: modernize-use-override.OverrideSpelling
value: 'ZMQ_OVERRIDE'
- key: modernize-use-override.FinalSpelling
value: 'ZMQ_FINAL'

View File

@@ -61,7 +61,7 @@ template <int ID = 0> class array_item_t
inline void set_array_index (int index_) { _array_index = index_; }
inline int get_array_index () { return _array_index; }
inline int get_array_index () const { return _array_index; }
private:
int _array_index;
@@ -89,20 +89,23 @@ template <typename T, int ID = 0> class array_t
inline void push_back (T *item_)
{
if (item_)
((item_t *) item_)->set_array_index ((int) _items.size ());
static_cast<item_t *> (item_)->set_array_index (
static_cast<int> (_items.size ()));
_items.push_back (item_);
}
inline void erase (T *item_)
{
erase (((item_t *) item_)->get_array_index ());
erase (static_cast<item_t *> (item_)->get_array_index ());
}
inline void erase (size_type index_)
{
if (_items.empty ())
return;
((item_t *) _items.back ())->set_array_index ((int) index_);
static_cast<item_t *> (_items.back ())
->set_array_index (static_cast<int> (index_));
_items[index_] = _items.back ();
_items.pop_back ();
}
@@ -110,17 +113,20 @@ template <typename T, int ID = 0> class array_t
inline void swap (size_type index1_, size_type index2_)
{
if (_items[index1_])
((item_t *) _items[index1_])->set_array_index ((int) index2_);
static_cast<item_t *> (_items[index1_])
->set_array_index (static_cast<int> (index2_));
if (_items[index2_])
((item_t *) _items[index2_])->set_array_index ((int) index1_);
static_cast<item_t *> (_items[index2_])
->set_array_index (static_cast<int> (index1_));
std::swap (_items[index1_], _items[index2_]);
}
inline void clear () { _items.clear (); }
inline size_type index (T *item_)
static inline size_type index (T *item_)
{
return (size_type) ((item_t *) item_)->get_array_index ();
return static_cast<size_type> (
static_cast<item_t *> (item_)->get_array_index ());
}
private:

View File

@@ -156,7 +156,7 @@ class atomic_counter_t
__atomic_sub_fetch (&_value, decrement_, __ATOMIC_ACQ_REL);
return nv != 0;
#elif defined ZMQ_ATOMIC_COUNTER_CXX11
integer_t old =
const integer_t old =
_value.fetch_sub (decrement_, std::memory_order_acq_rel);
return old - decrement_ != 0;
#elif defined ZMQ_ATOMIC_COUNTER_ATOMIC_H

View File

@@ -41,24 +41,24 @@ class msg_t;
class pipe_t;
class io_thread_t;
class client_t : public socket_base_t
class client_t ZMQ_FINAL : public socket_base_t
{
public:
client_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~client_t ();
~client_t () ZMQ_FINAL;
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
// Messages are fair-queued from inbound pipes. And load-balanced to

View File

@@ -103,7 +103,7 @@ f_compatible_get_tick_count64 init_compatible_get_tick_count64 ()
f_compatible_get_tick_count64 func = NULL;
#if !defined ZMQ_HAVE_WINDOWS_UWP
HMODULE module = ::LoadLibraryA ("Kernel32.dll");
const HMODULE module = ::LoadLibraryA ("Kernel32.dll");
if (module != NULL)
func = reinterpret_cast<f_compatible_get_tick_count64> (
::GetProcAddress (module, "GetTickCount64"));
@@ -200,7 +200,7 @@ uint64_t zmq::clock_t::now_us ()
uint64_t zmq::clock_t::now_ms ()
{
uint64_t tsc = rdtsc ();
const uint64_t tsc = rdtsc ();
// If TSC is not supported, get precise time and chop off the microseconds.
if (!tsc) {

View File

@@ -105,7 +105,7 @@ zmq::ctx_t::ctx_t () :
#endif
}
bool zmq::ctx_t::check_tag ()
bool zmq::ctx_t::check_tag () const
{
return _tag == ZMQ_CTX_TAG_VALUE_GOOD;
}
@@ -117,12 +117,13 @@ zmq::ctx_t::~ctx_t ()
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
const io_threads_t::size_type io_threads_size = _io_threads.size ();
for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
_io_threads[i]->stop ();
}
// Wait till I/O threads actually terminate.
for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
LIBZMQ_DELETE (_io_threads[i]);
}
@@ -156,7 +157,7 @@ int zmq::ctx_t::terminate ()
{
_slot_sync.lock ();
bool save_terminating = _terminating;
const bool save_terminating = _terminating;
_terminating = false;
// Connect up any pending inproc connections, otherwise we will hang
@@ -176,16 +177,17 @@ int zmq::ctx_t::terminate ()
if (_pid != getpid ()) {
// we are a forked child process. Close all file descriptors
// inherited from the parent.
for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
i++) {
_sockets[i]->get_mailbox ()->forked ();
}
_term_mailbox.forked ();
}
#endif
// Check whether termination was already underway, but interrupted and now
// restarted.
bool restarted = _terminating;
const bool restarted = _terminating;
_terminating = true;
// First attempt to terminate the context.
@@ -193,8 +195,10 @@ int zmq::ctx_t::terminate ()
// First send stop command to sockets so that any blocking calls
// can be interrupted. If there are no sockets we can ask reaper
// thread to stop.
for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
i++) {
_sockets[i]->stop ();
}
if (_sockets.empty ())
_reaper->stop ();
}
@@ -202,7 +206,7 @@ int zmq::ctx_t::terminate ()
// Wait till reaper thread closes all the sockets.
command_t cmd;
int rc = _term_mailbox.recv (&cmd, -1);
const int rc = _term_mailbox.recv (&cmd, -1);
if (rc == -1 && errno == EINTR)
return -1;
errno_assert (rc == 0);
@@ -239,8 +243,10 @@ int zmq::ctx_t::shutdown ()
// Send stop command to sockets so that any blocking calls
// can be interrupted. If there are no sockets we can ask reaper
// thread to stop.
for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
i++) {
_sockets[i]->stop ();
}
if (_sockets.empty ())
_reaper->stop ();
}
@@ -251,7 +257,7 @@ int zmq::ctx_t::shutdown ()
int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
const bool is_int = (optvallen_ == sizeof (int));
int value = 0;
if (is_int)
memcpy (&value, optval_, sizeof (int));
@@ -314,7 +320,7 @@ int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
return -1;
}
int zmq::ctx_t::get (int option_, void *optval_, size_t *optvallen_)
int zmq::ctx_t::get (int option_, void *optval_, const size_t *optvallen_)
{
const bool is_int = (*optvallen_ == sizeof (int));
int *value = static_cast<int *> (optval_);
@@ -406,7 +412,7 @@ bool zmq::ctx_t::start ()
const int mazmq = _max_sockets;
const int ios = _io_thread_count;
_opt_sync.unlock ();
int slot_count = mazmq + ios + term_and_reaper_threads_count;
const int slot_count = mazmq + ios + term_and_reaper_threads_count;
try {
_slots.reserve (slot_count);
_empty_slots.reserve (slot_count - term_and_reaper_threads_count);
@@ -492,11 +498,11 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
}
// Choose a slot for the socket.
uint32_t slot = _empty_slots.back ();
const uint32_t slot = _empty_slots.back ();
_empty_slots.pop_back ();
// Generate new unique socket ID.
int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
const int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
// Create the socket and register its mailbox.
socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
@@ -515,7 +521,7 @@ void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
scoped_lock_t locker (_slot_sync);
// Free the associated thread slot.
uint32_t tid = socket_->get_tid ();
const uint32_t tid = socket_->get_tid ();
_empty_slots.push_back (tid);
_slots[tid] = NULL;
@@ -528,7 +534,7 @@ void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
_reaper->stop ();
}
zmq::object_t *zmq::ctx_t::get_reaper ()
zmq::object_t *zmq::ctx_t::get_reaper () const
{
return _reaper;
}
@@ -557,7 +563,7 @@ void zmq::thread_ctx_t::start_thread (thread_t &thread_,
int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
const bool is_int = (optvallen_ == sizeof (int));
int value = 0;
if (is_int)
memcpy (&value, optval_, sizeof (int));
@@ -666,9 +672,10 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
// Find the I/O thread with minimum load.
int min_load = -1;
io_thread_t *selected_io_thread = NULL;
for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size;
i++) {
if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
int load = _io_threads[i]->get_load ();
const int load = _io_threads[i]->get_load ();
if (selected_io_thread == NULL || load < min_load) {
min_load = load;
selected_io_thread = _io_threads[i];
@@ -694,7 +701,7 @@ int zmq::ctx_t::register_endpoint (const char *addr_,
}
int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
socket_base_t *socket_)
const socket_base_t *const socket_)
{
scoped_lock_t locker (_endpoints_sync);
@@ -710,7 +717,7 @@ int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
return 0;
}
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
void zmq::ctx_t::unregister_endpoints (const socket_base_t *const socket_)
{
scoped_lock_t locker (_endpoints_sync);
@@ -758,7 +765,7 @@ void zmq::ctx_t::pend_connection (const std::string &addr_,
const pending_connection_t pending_connection = {endpoint_, pipes_[0],
pipes_[1]};
endpoints_t::iterator it = _endpoints.find (addr_);
const endpoints_t::iterator it = _endpoints.find (addr_);
if (it == _endpoints.end ()) {
// Still no bind.
endpoint_.socket->inc_seqnum ();
@@ -776,7 +783,8 @@ void zmq::ctx_t::connect_pending (const char *addr_,
{
scoped_lock_t locker (_endpoints_sync);
std::pair<pending_connections_t::iterator, pending_connections_t::iterator>
const std::pair<pending_connections_t::iterator,
pending_connections_t::iterator>
pending = _pending_connections.equal_range (addr_);
for (pending_connections_t::iterator p = pending.first; p != pending.second;
++p)
@@ -788,7 +796,7 @@ void zmq::ctx_t::connect_pending (const char *addr_,
void zmq::ctx_t::connect_inproc_sockets (
zmq::socket_base_t *bind_socket_,
options_t &bind_options_,
const options_t &bind_options_,
const pending_connection_t &pending_connection_,
side side_)
{

View File

@@ -90,14 +90,14 @@ class thread_ctx_t
// Context object encapsulates all the global state associated with
// the library.
class ctx_t : public thread_ctx_t
class ctx_t ZMQ_FINAL : public thread_ctx_t
{
public:
// Create the context object.
ctx_t ();
// Returns false if object is not a context.
bool check_tag ();
bool check_tag () const;
// This function is called when user invokes zmq_ctx_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
@@ -116,7 +116,7 @@ class ctx_t : public thread_ctx_t
// Set and get context properties.
int set (int option_, const void *optval_, size_t optvallen_);
int get (int option_, void *optval_, size_t *optvallen_);
int get (int option_, void *optval_, const size_t *optvallen_);
int get (int option_);
// Create and destroy a socket.
@@ -132,12 +132,13 @@ class ctx_t : public thread_ctx_t
zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
// Returns reaper thread object.
zmq::object_t *get_reaper ();
zmq::object_t *get_reaper () const;
// Management of inproc endpoints.
int register_endpoint (const char *addr_, const endpoint_t &endpoint_);
int unregister_endpoint (const std::string &addr_, socket_base_t *socket_);
void unregister_endpoints (zmq::socket_base_t *socket_);
int unregister_endpoint (const std::string &addr_,
const socket_base_t *socket_);
void unregister_endpoints (const zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_);
void pend_connection (const std::string &addr_,
const endpoint_t &endpoint_,
@@ -252,9 +253,10 @@ class ctx_t : public thread_ctx_t
connect_side,
bind_side
};
void
static void
connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
options_t &bind_options_,
const options_t &bind_options_,
const pending_connection_t &pending_connection_,
side side_);

View File

@@ -158,7 +158,7 @@ int zmq::curve_client_t::produce_hello (msg_t *msg_)
int zmq::curve_client_t::process_welcome (const uint8_t *msg_data_,
size_t msg_size_)
{
int rc = _tools.process_welcome (msg_data_, msg_size_, cn_precom);
const int rc = _tools.process_welcome (msg_data_, msg_size_, cn_precom);
if (rc == -1) {
session->get_socket ()->event_handshake_failed_protocol (

View File

@@ -41,18 +41,18 @@ namespace zmq
class msg_t;
class session_base_t;
class curve_client_t : public curve_mechanism_base_t
class curve_client_t ZMQ_FINAL : public curve_mechanism_base_t
{
public:
curve_client_t (session_base_t *session_, const options_t &options_);
virtual ~curve_client_t ();
~curve_client_t () ZMQ_FINAL;
// mechanism implementation
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual int encode (msg_t *msg_);
virtual int decode (msg_t *msg_);
virtual status_t status () const;
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
int encode (msg_t *msg_) ZMQ_FINAL;
int decode (msg_t *msg_) ZMQ_FINAL;
status_t status () const ZMQ_FINAL;
private:
enum state_t

View File

@@ -70,7 +70,7 @@ struct curve_client_tools_t
put_uint64 (hello_nonce + 16, cn_nonce_);
// Create Box [64 * %x0](C'->S)
int rc =
const int rc =
crypto_box (hello_box, &hello_plaintext[0], hello_plaintext.size (),
hello_nonce, server_key_, cn_secret_);
if (rc == -1)
@@ -268,7 +268,7 @@ struct curve_client_tools_t
size_t size_,
const uint64_t cn_nonce_,
const uint8_t *metadata_plaintext_,
const size_t metadata_length_)
const size_t metadata_length_) const
{
return produce_initiate (data_, size_, cn_nonce_, server_key,
public_key, secret_key, cn_public, cn_secret,

View File

@@ -125,7 +125,7 @@ int zmq::curve_mechanism_base_t::decode (msg_t *msg_)
uint8_t message_nonce[crypto_box_NONCEBYTES];
memcpy (message_nonce, decode_nonce_prefix, 16);
memcpy (message_nonce + 16, message + 8, 8);
uint64_t nonce = get_uint64 (message + 8);
const uint64_t nonce = get_uint64 (message + 8);
if (nonce <= cn_peer_nonce) {
session->get_socket ()->event_handshake_failed_protocol (
session->get_endpoint (), ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE);

View File

@@ -61,8 +61,8 @@ class curve_mechanism_base_t : public virtual mechanism_base_t
const char *decode_nonce_prefix_);
// mechanism implementation
virtual int encode (msg_t *msg_);
virtual int decode (msg_t *msg_);
int encode (msg_t *msg_) ZMQ_OVERRIDE;
int decode (msg_t *msg_) ZMQ_OVERRIDE;
protected:
const char *encode_nonce_prefix;

View File

@@ -42,20 +42,20 @@ namespace zmq
#pragma warning(push)
#pragma warning(disable : 4250)
#endif
class curve_server_t : public zap_client_common_handshake_t,
public curve_mechanism_base_t
class curve_server_t ZMQ_FINAL : public zap_client_common_handshake_t,
public curve_mechanism_base_t
{
public:
curve_server_t (session_base_t *session_,
const std::string &peer_address_,
const options_t &options_);
virtual ~curve_server_t ();
~curve_server_t () ZMQ_FINAL;
// mechanism implementation
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual int encode (msg_t *msg_);
virtual int decode (msg_t *msg_);
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
int encode (msg_t *msg_) ZMQ_FINAL;
int decode (msg_t *msg_) ZMQ_FINAL;
private:
// Our secret key (s)

View File

@@ -76,7 +76,7 @@ int zmq::dealer_t::xsetsockopt (int option_,
const void *optval_,
size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
const bool is_int = (optvallen_ == sizeof (int));
int value = 0;
if (is_int)
memcpy (&value, optval_, sizeof (int));

View File

@@ -47,21 +47,23 @@ class dealer_t : public socket_base_t
{
public:
dealer_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~dealer_t ();
~dealer_t () ZMQ_OVERRIDE;
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_) ZMQ_FINAL;
int xsetsockopt (int option_,
const void *optval_,
size_t optvallen_) ZMQ_OVERRIDE;
int xsend (zmq::msg_t *msg_) ZMQ_OVERRIDE;
int xrecv (zmq::msg_t *msg_) ZMQ_OVERRIDE;
bool xhas_in () ZMQ_OVERRIDE;
bool xhas_out () ZMQ_OVERRIDE;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_OVERRIDE;
// Send and recv - knowing which pipe was used.
int sendpipe (zmq::msg_t *msg_, zmq::pipe_t **pipe_);

View File

@@ -66,12 +66,10 @@ class decoder_base_t : public i_decoder
_buf = _allocator.allocate ();
}
// The destructor doesn't have to be virtual. It is made virtual
// just to keep ICC and code checking tools from complaining.
virtual ~decoder_base_t () { _allocator.deallocate (); }
~decoder_base_t () ZMQ_OVERRIDE { _allocator.deallocate (); }
// Returns a buffer to be filled with binary data.
void get_buffer (unsigned char **data_, std::size_t *size_)
void get_buffer (unsigned char **data_, std::size_t *size_) ZMQ_FINAL
{
_buf = _allocator.allocate ();
@@ -101,7 +99,7 @@ class decoder_base_t : public i_decoder
// Number of bytes processed is returned in bytes_used_.
int decode (const unsigned char *data_,
std::size_t size_,
std::size_t &bytes_used_)
std::size_t &bytes_used_) ZMQ_FINAL
{
bytes_used_ = 0;
@@ -149,7 +147,7 @@ class decoder_base_t : public i_decoder
return 0;
}
virtual void resize_buffer (std::size_t new_size_)
void resize_buffer (std::size_t new_size_) ZMQ_FINAL
{
_allocator.resize (new_size_);
}

View File

@@ -47,13 +47,13 @@ struct i_poll_events;
// Implements socket polling mechanism using the "/dev/poll" interface.
class devpoll_t : public worker_poller_base_t
class devpoll_t ZMQ_FINAL : public worker_poller_base_t
{
public:
typedef fd_t handle_t;
devpoll_t (const thread_ctx_t &ctx_);
~devpoll_t ();
~devpoll_t () ZMQ_FINAL;
// "poller" concept.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
@@ -68,7 +68,7 @@ class devpoll_t : public worker_poller_base_t
private:
// Main event loop.
void loop ();
void loop () ZMQ_FINAL;
// File descriptor referring to "/dev/poll" pseudo-device.
fd_t devpoll_fd;

View File

@@ -94,7 +94,7 @@ int zmq::dgram_t::xsend (msg_t *msg_)
{
// If there's no out pipe, just drop it.
if (!_pipe) {
int rc = msg_->close ();
const int rc = msg_->close ();
errno_assert (rc == 0);
return -1;
}
@@ -127,7 +127,7 @@ int zmq::dgram_t::xsend (msg_t *msg_)
_more_out = !_more_out;
// Detach the message from the data buffer.
int rc = msg_->init ();
const int rc = msg_->init ();
errno_assert (rc == 0);
return 0;

View File

@@ -41,23 +41,23 @@ class msg_t;
class pipe_t;
class io_thread_t;
class dgram_t : public socket_base_t
class dgram_t ZMQ_FINAL : public socket_base_t
{
public:
dgram_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~dgram_t ();
~dgram_t () ZMQ_FINAL;
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
zmq::pipe_t *_pipe;

View File

@@ -44,13 +44,13 @@ zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
// subscription commands are sent to the wire.
options.linger.store (0);
int rc = _message.init ();
const int rc = _message.init ();
errno_assert (rc == 0);
}
zmq::dish_t::~dish_t ()
{
int rc = _message.close ();
const int rc = _message.close ();
errno_assert (rc == 0);
}
@@ -93,7 +93,7 @@ void zmq::dish_t::xhiccuped (pipe_t *pipe_)
int zmq::dish_t::xjoin (const char *group_)
{
std::string group = std::string (group_);
const std::string group = std::string (group_);
if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
errno = EINVAL;
@@ -117,7 +117,7 @@ int zmq::dish_t::xjoin (const char *group_)
rc = _dist.send_to_all (&msg);
if (rc != 0)
err = errno;
int rc2 = msg.close ();
const int rc2 = msg.close ();
errno_assert (rc2 == 0);
if (rc != 0)
errno = err;
@@ -126,7 +126,7 @@ int zmq::dish_t::xjoin (const char *group_)
int zmq::dish_t::xleave (const char *group_)
{
std::string group = std::string (group_);
const std::string group = std::string (group_);
if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
errno = EINVAL;
@@ -149,7 +149,7 @@ int zmq::dish_t::xleave (const char *group_)
rc = _dist.send_to_all (&msg);
if (rc != 0)
err = errno;
int rc2 = msg.close ();
const int rc2 = msg.close ();
errno_assert (rc2 == 0);
if (rc != 0)
errno = err;
@@ -269,7 +269,7 @@ int zmq::dish_session_t::push_msg (msg_t *msg_)
_group_msg = *msg_;
_state = body;
int rc = msg_->init ();
const int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
@@ -312,7 +312,7 @@ int zmq::dish_session_t::pull_msg (msg_t *msg_)
if (!msg_->is_join () && !msg_->is_leave ())
return rc;
int group_length = static_cast<int> (strlen (msg_->group ()));
const int group_length = static_cast<int> (strlen (msg_->group ()));
msg_t command;
int offset;

View File

@@ -44,27 +44,27 @@ class ctx_t;
class pipe_t;
class io_thread_t;
class dish_t : public socket_base_t
class dish_t ZMQ_FINAL : public socket_base_t
{
public:
dish_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~dish_t ();
~dish_t () ZMQ_FINAL;
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xhiccuped (pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
int xjoin (const char *group_);
int xleave (const char *group_);
bool locally_initiated_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xhiccuped (pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
int xjoin (const char *group_) ZMQ_FINAL;
int xleave (const char *group_) ZMQ_FINAL;
private:
int xxrecv (zmq::msg_t *msg_);
@@ -90,7 +90,7 @@ class dish_t : public socket_base_t
ZMQ_NON_COPYABLE_NOR_MOVABLE (dish_t)
};
class dish_session_t : public session_base_t
class dish_session_t ZMQ_FINAL : public session_base_t
{
public:
dish_session_t (zmq::io_thread_t *io_thread_,
@@ -98,12 +98,12 @@ class dish_session_t : public session_base_t
zmq::socket_base_t *socket_,
const options_t &options_,
address_t *addr_);
~dish_session_t ();
~dish_session_t () ZMQ_FINAL;
// Overrides of the functions from session_base_t.
int push_msg (msg_t *msg_);
int pull_msg (msg_t *msg_);
void reset ();
int push_msg (msg_t *msg_) ZMQ_FINAL;
int pull_msg (msg_t *msg_) ZMQ_FINAL;
void reset () ZMQ_FINAL;
private:
enum

View File

@@ -81,7 +81,7 @@ void zmq::dist_t::match (pipe_t *pipe_)
void zmq::dist_t::reverse_match ()
{
pipes_t::size_type prev_matching = _matching;
const pipes_t::size_type prev_matching = _matching;
// Reset matching to 0
unmatch ();
@@ -145,7 +145,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_)
int zmq::dist_t::send_to_matching (msg_t *msg_)
{
// Is this end of a multipart message?
bool msg_more = (msg_->flags () & msg_t::more) != 0;
const bool msg_more = (msg_->flags () & msg_t::more) != 0;
// Push the message to matching pipes.
distribute (msg_);
@@ -204,7 +204,7 @@ void zmq::dist_t::distribute (msg_t *msg_)
// Detach the original message from the data buffer. Note that we don't
// close the message. That's because we've already used all the references.
int rc = msg_->init ();
const int rc = msg_->init ();
errno_assert (rc == 0);
}

View File

@@ -73,7 +73,7 @@ class dist_t
// Send the message to all the outbound pipes.
int send_to_all (zmq::msg_t *msg_);
bool has_out ();
static bool has_out ();
// check HWM of all pipes matching
bool check_hwm ();

View File

@@ -66,17 +66,15 @@ template <typename T> class encoder_base_t : public i_encoder
alloc_assert (_buf);
}
// The destructor doesn't have to be virtual. It is made virtual
// just to keep ICC and code checking tools from complaining.
inline virtual ~encoder_base_t () { free (_buf); }
inline ~encoder_base_t () ZMQ_OVERRIDE { free (_buf); }
// The function returns a batch of binary data. The data
// are filled to a supplied buffer. If no buffer is supplied (data_
// points to NULL) decoder object will provide buffer of its own.
inline size_t encode (unsigned char **data_, size_t size_)
inline size_t encode (unsigned char **data_, size_t size_) ZMQ_FINAL
{
unsigned char *buffer = !*data_ ? _buf : *data_;
size_t buffersize = !*data_ ? _buf_size : size_;
const size_t buffersize = !*data_ ? _buf_size : size_;
if (in_progress () == NULL)
return 0;
@@ -117,7 +115,7 @@ template <typename T> class encoder_base_t : public i_encoder
}
// Copy data to the buffer. If the buffer is full, return.
size_t to_copy = std::min (_to_write, buffersize - pos);
const size_t to_copy = std::min (_to_write, buffersize - pos);
memcpy (buffer + pos, _write_pos, to_copy);
pos += to_copy;
_write_pos += to_copy;
@@ -128,7 +126,7 @@ template <typename T> class encoder_base_t : public i_encoder
return pos;
}
void load_msg (msg_t *msg_)
void load_msg (msg_t *msg_) ZMQ_FINAL
{
zmq_assert (in_progress () == NULL);
_in_progress = msg_;

View File

@@ -96,7 +96,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
pe->ev.data.ptr = pe;
pe->events = events_;
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev);
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev);
errno_assert (rc != -1);
// Increase the load metric of the thread.
@@ -109,7 +109,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)
{
check_thread ();
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
errno_assert (rc != -1);
pe->fd = retired_fd;
_retired.push_back (pe);
@@ -123,7 +123,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_)
check_thread ();
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
pe->ev.events |= EPOLLIN;
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}
@@ -132,7 +132,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_)
check_thread ();
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
pe->ev.events &= ~(static_cast<short> (EPOLLIN));
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}
@@ -141,7 +141,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_)
check_thread ();
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
pe->ev.events |= EPOLLOUT;
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}
@@ -150,7 +150,7 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
check_thread ();
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
pe->ev.events &= ~(static_cast<short> (EPOLLOUT));
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}
@@ -170,7 +170,7 @@ void zmq::epoll_t::loop ()
while (true) {
// Execute any due timers.
int timeout = static_cast<int> (execute_timers ());
const int timeout = static_cast<int> (execute_timers ());
if (get_load () == 0) {
if (timeout == 0)
@@ -181,8 +181,8 @@ void zmq::epoll_t::loop ()
}
// Wait for events.
int n = epoll_wait (_epoll_fd, &ev_buf[0], max_io_events,
timeout ? timeout : -1);
const int n = epoll_wait (_epoll_fd, &ev_buf[0], max_io_events,
timeout ? timeout : -1);
if (n == -1) {
errno_assert (errno == EINTR);
continue;

View File

@@ -55,13 +55,13 @@ struct i_poll_events;
// This class implements socket polling mechanism using the Linux-specific
// epoll mechanism.
class epoll_t : public worker_poller_base_t
class epoll_t ZMQ_FINAL : public worker_poller_base_t
{
public:
typedef void *handle_t;
epoll_t (const thread_ctx_t &ctx_);
~epoll_t ();
~epoll_t () ZMQ_OVERRIDE;
// "poller" concept.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
@@ -87,7 +87,7 @@ class epoll_t : public worker_poller_base_t
#endif
// Main event loop.
void loop ();
void loop () ZMQ_FINAL;
// Main epoll file descriptor
epoll_fd_t _epoll_fd;

View File

@@ -211,14 +211,14 @@ const char *zmq::wsa_error_no (int no_, const char *wsae_wouldblock_string_)
void zmq::win_error (char *buffer_, size_t buffer_size_)
{
DWORD errcode = GetLastError ();
const DWORD errcode = GetLastError ();
#if defined _WIN32_WCE
DWORD rc = FormatMessageW (
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, errcode,
MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT), (LPWSTR) buffer_,
buffer_size_ / sizeof (wchar_t), NULL);
#else
DWORD rc = FormatMessageA (
const DWORD rc = FormatMessageA (
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, errcode,
MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT), buffer_,
static_cast<DWORD> (buffer_size_), NULL);

View File

@@ -90,7 +90,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
while (_active > 0) {
// Try to fetch new message. If we've already read part of the message
// subsequent part should be immediately available.
bool fetched = _pipes[_current]->read (msg_);
const bool fetched = _pipes[_current]->read (msg_);
// Note that when message is not fetched, current pipe is deactivated
// and replaced by another active pipe. Thus we don't have to increase

View File

@@ -39,21 +39,21 @@ class ctx_t;
class pipe_t;
class msg_t;
class gather_t : public socket_base_t
class gather_t ZMQ_FINAL : public socket_base_t
{
public:
gather_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~gather_t ();
~gather_t () ZMQ_FINAL;
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
void xread_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_) ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
// Fair queueing object for inbound pipes.

View File

@@ -427,13 +427,14 @@ void zmq::generic_mtrie_t<T>::match (prefix_t data_,
void (*func_) (value_t *pipe_, Arg arg_),
Arg arg_)
{
generic_mtrie_t *current = this;
while (true) {
for (generic_mtrie_t *current = this; current; data_++, size_--) {
// Signal the pipes attached to this node.
if (current->_pipes) {
for (typename pipes_t::iterator it = current->_pipes->begin ();
it != current->_pipes->end (); ++it)
for (typename pipes_t::iterator it = current->_pipes->begin (),
end = current->_pipes->end ();
it != end; ++it) {
func_ (*it, arg_);
}
}
// If we are at the end of the message, there's nothing more to match.
@@ -444,25 +445,20 @@ void zmq::generic_mtrie_t<T>::match (prefix_t data_,
if (current->_count == 0)
break;
// If there's one subnode (optimisation).
if (current->_count == 1) {
if (data_[0] != current->_min)
// If there's one subnode (optimisation).
if (data_[0] != current->_min) {
break;
}
current = current->_next.node;
data_++;
size_--;
continue;
} else {
// If there are multiple subnodes.
if (data_[0] < current->_min
|| data_[0] >= current->_min + current->_count) {
break;
}
current = current->_next.table[data_[0] - current->_min];
}
// If there are multiple subnodes.
if (data_[0] < current->_min
|| data_[0] >= current->_min + current->_count)
break;
if (!current->_next.table[data_[0] - current->_min])
break;
current = current->_next.table[data_[0] - current->_min];
data_++;
size_--;
}
}

View File

@@ -39,18 +39,18 @@ namespace zmq
class msg_t;
class session_base_t;
class gssapi_client_t : public gssapi_mechanism_base_t
class gssapi_client_t ZMQ_FINAL : public gssapi_mechanism_base_t
{
public:
gssapi_client_t (session_base_t *session_, const options_t &options_);
virtual ~gssapi_client_t ();
~gssapi_client_t () ZMQ_FINAL;
// mechanism implementation
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual int encode (msg_t *msg_);
virtual int decode (msg_t *msg_);
virtual status_t status () const;
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
int encode (msg_t *msg_) ZMQ_FINAL;
int decode (msg_t *msg_) ZMQ_FINAL;
status_t status () const ZMQ_FINAL;
private:
enum state_t

View File

@@ -53,7 +53,7 @@ class gssapi_mechanism_base_t : public virtual mechanism_base_t
public:
gssapi_mechanism_base_t (session_base_t *session_,
const options_t &options_);
virtual ~gssapi_mechanism_base_t () = 0;
~gssapi_mechanism_base_t () ZMQ_OVERRIDE = 0;
protected:
// Produce a context-level GSSAPI token (INITIATE command)

View File

@@ -40,21 +40,22 @@ namespace zmq
class msg_t;
class session_base_t;
class gssapi_server_t : public gssapi_mechanism_base_t, public zap_client_t
class gssapi_server_t ZMQ_FINAL : public gssapi_mechanism_base_t,
public zap_client_t
{
public:
gssapi_server_t (session_base_t *session_,
const std::string &peer_address,
const options_t &options_);
virtual ~gssapi_server_t ();
~gssapi_server_t () ZMQ_FINAL;
// mechanism implementation
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual int encode (msg_t *msg_);
virtual int decode (msg_t *msg_);
virtual int zap_msg_available ();
virtual status_t status () const;
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
int encode (msg_t *msg_) ZMQ_FINAL;
int decode (msg_t *msg_) ZMQ_FINAL;
int zap_msg_available () ZMQ_FINAL;
status_t status () const ZMQ_FINAL;
private:
enum state_t

View File

@@ -48,7 +48,7 @@ class io_object_t : public i_poll_events
{
public:
io_object_t (zmq::io_thread_t *io_thread_ = NULL);
~io_object_t ();
~io_object_t () ZMQ_OVERRIDE;
// When migrating an object from one I/O thread to another, first
// unplug it, then migrate it, then plug it to the new thread.
@@ -69,9 +69,9 @@ class io_object_t : public i_poll_events
void cancel_timer (int id_);
// i_poll_events interface implementation.
void in_event ();
void out_event ();
void timer_event (int id_);
void in_event () ZMQ_OVERRIDE;
void out_event () ZMQ_OVERRIDE;
void timer_event (int id_) ZMQ_OVERRIDE;
private:
poller_t *_poller;

View File

@@ -73,7 +73,7 @@ zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
return &_mailbox;
}
int zmq::io_thread_t::get_load ()
int zmq::io_thread_t::get_load () const
{
return _poller->get_load ();
}
@@ -107,7 +107,7 @@ void zmq::io_thread_t::timer_event (int)
zmq_assert (false);
}
zmq::poller_t *zmq::io_thread_t::get_poller ()
zmq::poller_t *zmq::io_thread_t::get_poller () const
{
zmq_assert (_poller);
return _poller;

View File

@@ -43,14 +43,14 @@ class ctx_t;
// Generic part of the I/O thread. Polling-mechanism-specific features
// are implemented in separate "polling objects".
class io_thread_t : public object_t, public i_poll_events
class io_thread_t ZMQ_FINAL : public object_t, public i_poll_events
{
public:
io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);
// Clean-up. If the thread was started, it's necessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
~io_thread_t ();
~io_thread_t () ZMQ_FINAL;
// Launch the physical thread.
void start ();
@@ -62,18 +62,18 @@ class io_thread_t : public object_t, public i_poll_events
mailbox_t *get_mailbox ();
// i_poll_events implementation.
void in_event ();
void out_event ();
void timer_event (int id_);
void in_event () ZMQ_FINAL;
void out_event () ZMQ_FINAL;
void timer_event (int id_) ZMQ_FINAL;
// Used by io_objects to retrieve the associated poller object.
poller_t *get_poller ();
poller_t *get_poller () const;
// Command handlers.
void process_stop ();
void process_stop () ZMQ_FINAL;
// Returns load experienced by the I/O thread.
int get_load ();
int get_load () const;
private:
// I/O thread accesses incoming commands via this mailbox.

View File

@@ -128,7 +128,7 @@ void zmq::unblock_socket (fd_t s_)
{
#if defined ZMQ_HAVE_WINDOWS
u_long nonblock = 1;
int rc = ioctlsocket (s_, FIONBIO, &nonblock);
const int rc = ioctlsocket (s_, FIONBIO, &nonblock);
wsa_assert (rc != SOCKET_ERROR);
#elif defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
int nonblock = 1;
@@ -154,8 +154,8 @@ void zmq::enable_ipv4_mapping (fd_t s_)
#else
int flag = 0;
#endif
int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY,
reinterpret_cast<char *> (&flag), sizeof (flag));
const int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY,
reinterpret_cast<char *> (&flag), sizeof (flag));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
@@ -168,7 +168,7 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
{
struct sockaddr_storage ss;
zmq_socklen_t addrlen =
const zmq_socklen_t addrlen =
get_socket_address (sockfd_, socket_end_remote, &ss);
if (addrlen == 0) {
@@ -186,8 +186,9 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
}
char host[NI_MAXHOST];
int rc = getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen,
host, sizeof host, NULL, 0, NI_NUMERICHOST);
const int rc =
getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen, host,
sizeof host, NULL, 0, NI_NUMERICHOST);
if (rc != 0)
return 0;
@@ -297,9 +298,9 @@ bool zmq::initialize_network ()
// Intialise Windows sockets. Note that WSAStartup can be called multiple
// times given that WSACleanup will be called for each WSAStartup.
WORD version_requested = MAKEWORD (2, 2);
const WORD version_requested = MAKEWORD (2, 2);
WSADATA wsa_data;
int rc = WSAStartup (version_requested, &wsa_data);
const int rc = WSAStartup (version_requested, &wsa_data);
zmq_assert (rc == 0);
zmq_assert (LOBYTE (wsa_data.wVersion) == 2
&& HIBYTE (wsa_data.wVersion) == 2);
@@ -312,7 +313,7 @@ void zmq::shutdown_network ()
{
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
int rc = WSACleanup ();
const int rc = WSACleanup ();
wsa_assert (rc != SOCKET_ERROR);
#endif
@@ -327,7 +328,7 @@ void zmq::shutdown_network ()
static void tune_socket (const SOCKET socket_)
{
BOOL tcp_nodelay = 1;
int rc =
const int rc =
setsockopt (socket_, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char *> (&tcp_nodelay), sizeof tcp_nodelay);
wsa_assert (rc != SOCKET_ERROR);
@@ -363,7 +364,7 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
// Create critical section only if using fixed signaler port
// Use problematic Event implementation for compatibility if using old port 5905.
// Otherwise use Mutex implementation.
int event_signaler_port = 5905;
const int event_signaler_port = 5905;
if (zmq::signaler_port == event_signaler_port) {
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
@@ -431,7 +432,7 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
if (sync != NULL) {
// Enter the critical section.
DWORD dwrc = WaitForSingleObject (sync, INFINITE);
const DWORD dwrc = WaitForSingleObject (sync, INFINITE);
zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
}
@@ -468,7 +469,7 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
// Send/receive large chunk to work around TCP slow start
// This code is a workaround for #1608
if (*r_ != INVALID_SOCKET) {
size_t dummy_size =
const size_t dummy_size =
1024 * 1024; // 1M to overload default receive buffer
unsigned char *dummy =
static_cast<unsigned char *> (malloc (dummy_size));
@@ -561,7 +562,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
std::string dirname, filename;
// Create a listening socket.
SOCKET listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
const SOCKET listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (listener == retired_fd) {
// This may happen if the library was built on a system supporting AF_UNIX, but the system running doesn't support it.
goto try_tcpip;
@@ -801,8 +802,8 @@ void zmq::assert_success_or_recoverable (zmq::fd_t s_, int rc_)
socklen_t len = sizeof err;
#endif
int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
const int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.

View File

@@ -200,7 +200,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
}
addr = std::string (name_, delim - name_);
std::string port_str = std::string (delim + 1);
const std::string port_str = std::string (delim + 1);
if (port_str == "*") {
if (_options.bindable ()) {
@@ -229,7 +229,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
// Check if path is allowed in ip address, if allowed it must be truncated
if (_options.allow_path ()) {
size_t pos = addr.find ('/');
const size_t pos = addr.find ('/');
if (pos != std::string::npos)
addr = addr.substr (0, pos);
}
@@ -247,7 +247,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
// Look for an interface name / zone_id in the address
// Reference: https://tools.ietf.org/html/rfc4007
std::size_t pos = addr.rfind ('%');
const std::size_t pos = addr.rfind ('%');
uint32_t zone_id = 0;
if (pos != std::string::npos) {
@@ -277,7 +277,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
if (!resolved && _options.allow_nic_name ()) {
// Try to resolve the string as a NIC name.
int rc = resolve_nic_name (ip_addr_, addr_str);
const int rc = resolve_nic_name (ip_addr_, addr_str);
if (rc == 0) {
resolved = true;
@@ -287,7 +287,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
}
if (!resolved) {
int rc = resolve_getaddrinfo (ip_addr_, addr_str);
const int rc = resolve_getaddrinfo (ip_addr_, addr_str);
if (rc != 0) {
return rc;
@@ -388,7 +388,7 @@ int zmq::ip_resolver_t::resolve_getaddrinfo (ip_addr_t *ip_addr_,
// Use the first result.
zmq_assert (res != NULL);
zmq_assert ((size_t) res->ai_addrlen <= sizeof (*ip_addr_));
zmq_assert (static_cast<size_t> (res->ai_addrlen) <= sizeof (*ip_addr_));
memcpy (ip_addr_, res->ai_addr, res->ai_addrlen);
// Cleanup getaddrinfo after copying the possibly referenced result.
@@ -597,7 +597,7 @@ int zmq::ip_resolver_t::get_interface_name (unsigned long index_,
int zmq::ip_resolver_t::wchar_to_utf8 (const WCHAR *src_, char **dest_) const
{
int rc;
int buffer_len =
const int buffer_len =
WideCharToMultiByte (CP_UTF8, 0, src_, -1, NULL, 0, NULL, 0);
char *buffer = static_cast<char *> (malloc (buffer_len));

View File

@@ -65,7 +65,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
void zmq::ipc_connecter_t::out_event ()
{
fd_t fd = connect ();
const fd_t fd = connect ();
rm_handle ();
// Handle the error condition by attempt to reconnect.
@@ -81,7 +81,7 @@ void zmq::ipc_connecter_t::out_event ()
void zmq::ipc_connecter_t::start_connecting ()
{
// Open the connecting socket.
int rc = open ();
const int rc = open ();
// Connect may succeed in synchronous manner.
if (rc == 0) {
@@ -122,8 +122,8 @@ int zmq::ipc_connecter_t::open ()
unblock_socket (_s);
// Connect to the remote peer.
int rc = ::connect (_s, _addr->resolved.ipc_addr->addr (),
_addr->resolved.ipc_addr->addrlen ());
const int rc = ::connect (_s, _addr->resolved.ipc_addr->addr (),
_addr->resolved.ipc_addr->addrlen ());
// Connect was successful immediately.
if (rc == 0)
@@ -153,8 +153,8 @@ zmq::fd_t zmq::ipc_connecter_t::connect ()
// implementations and Solaris.
int err = 0;
zmq_socklen_t len = static_cast<zmq_socklen_t> (sizeof (err));
int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
if (rc == -1) {
if (errno == ENOPROTOOPT)
errno = 0;
@@ -171,7 +171,7 @@ zmq::fd_t zmq::ipc_connecter_t::connect ()
return retired_fd;
}
fd_t result = _s;
const fd_t result = _s;
_s = retired_fd;
return result;
}

View File

@@ -37,7 +37,7 @@
namespace zmq
{
class ipc_connecter_t : public stream_connecter_base_t
class ipc_connecter_t ZMQ_FINAL : public stream_connecter_base_t
{
public:
// If 'delayed_start' is true connecter first waits for a while,
@@ -50,10 +50,10 @@ class ipc_connecter_t : public stream_connecter_base_t
private:
// Handlers for I/O events.
void out_event ();
void out_event () ZMQ_FINAL;
// Internal function to start the actual connection establishment.
void start_connecting ();
void start_connecting () ZMQ_FINAL;
// Open IPC connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with

View File

@@ -85,7 +85,7 @@ zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
void zmq::ipc_listener_t::in_event ()
{
fd_t fd = accept ();
const fd_t fd = accept ();
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
@@ -134,7 +134,7 @@ int zmq::ipc_listener_t::set_local_address (const char *addr_)
if (rc != 0) {
if (!_tmp_socket_dirname.empty ()) {
// We need to preserve errno to return to the user
int tmp_errno = errno;
const int tmp_errno = errno;
::rmdir (_tmp_socket_dirname.c_str ());
_tmp_socket_dirname.clear ();
errno = tmp_errno;
@@ -152,7 +152,7 @@ int zmq::ipc_listener_t::set_local_address (const char *addr_)
if (_s == retired_fd) {
if (!_tmp_socket_dirname.empty ()) {
// We need to preserve errno to return to the user
int tmp_errno = errno;
const int tmp_errno = errno;
::rmdir (_tmp_socket_dirname.c_str ());
_tmp_socket_dirname.clear ();
errno = tmp_errno;
@@ -180,7 +180,7 @@ int zmq::ipc_listener_t::set_local_address (const char *addr_)
return 0;
error:
int err = errno;
const int err = errno;
close ();
errno = err;
return -1;
@@ -189,7 +189,7 @@ error:
int zmq::ipc_listener_t::close ()
{
zmq_assert (_s != retired_fd);
fd_t fd_for_event = _s;
const fd_t fd_for_event = _s;
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (_s);
wsa_assert (rc != SOCKET_ERROR);
@@ -253,9 +253,10 @@ bool zmq::ipc_listener_t::filter (fd_t sock_)
if (!(pw = getpwuid (cred.uid)))
return false;
for (options_t::ipc_gid_accept_filters_t::const_iterator it =
options.ipc_gid_accept_filters.begin ();
it != options.ipc_gid_accept_filters.end (); it++) {
for (options_t::ipc_gid_accept_filters_t::const_iterator
it = options.ipc_gid_accept_filters.begin (),
end = options.ipc_gid_accept_filters.end ();
it != end; it++) {
if (!(gr = getgrgid (*it)))
continue;
for (char **mem = gr->gr_mem; *mem; mem++) {
@@ -312,7 +313,7 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
socklen_t ss_len = sizeof (ss);
#endif
fd_t sock =
const fd_t sock =
::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
#endif
if (sock == retired_fd) {
@@ -341,7 +342,7 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
if (zmq::set_nosigpipe (sock)) {
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (sock);
const int rc = closesocket (sock);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = ::close (sock);

View File

@@ -39,7 +39,7 @@
namespace zmq
{
class ipc_listener_t : public stream_listener_base_t
class ipc_listener_t ZMQ_FINAL : public stream_listener_base_t
{
public:
ipc_listener_t (zmq::io_thread_t *io_thread_,
@@ -50,11 +50,12 @@ class ipc_listener_t : public stream_listener_base_t
int set_local_address (const char *addr_);
protected:
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
std::string get_socket_name (fd_t fd_,
socket_end_t socket_end_) const ZMQ_FINAL;
private:
// Handlers for I/O events.
void in_event ();
void in_event () ZMQ_FINAL;
// Filter new connections if the OS provides a mechanism to get
// the credentials of the peer process. Called from accept().
@@ -62,7 +63,7 @@ class ipc_listener_t : public stream_listener_base_t
bool filter (fd_t sock_);
#endif
int close ();
int close () ZMQ_FINAL;
// Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd

View File

@@ -49,13 +49,13 @@ struct i_poll_events;
// Implements socket polling mechanism using the BSD-specific
// kqueue interface.
class kqueue_t : public worker_poller_base_t
class kqueue_t ZMQ_FINAL : public worker_poller_base_t
{
public:
typedef void *handle_t;
kqueue_t (const thread_ctx_t &ctx_);
~kqueue_t ();
~kqueue_t () ZMQ_FINAL;
// "poller" concept.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
@@ -70,7 +70,7 @@ class kqueue_t : public worker_poller_base_t
private:
// Main event loop.
void loop ();
void loop () ZMQ_FINAL;
// File descriptor referring to the kernel event queue.
fd_t kqueue_fd;

View File

@@ -50,7 +50,7 @@ void zmq::lb_t::attach (pipe_t *pipe_)
void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
{
pipes_t::size_type index = _pipes.index (pipe_);
const pipes_t::size_type index = _pipes.index (pipe_);
// If we are in the middle of multipart message and current pipe
// have disconnected, we have to drop the remainder of the message.
@@ -151,7 +151,7 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
}
// Detach the message from the data buffer.
int rc = msg_->init ();
const int rc = msg_->init ();
errno_assert (rc == 0);
return 0;

View File

@@ -20,6 +20,22 @@
#endif
#endif
#if !defined ZMQ_OVERRIDE
#if defined ZMQ_HAVE_NOEXCEPT
#define ZMQ_OVERRIDE override
#else
#define ZMQ_OVERRIDE
#endif
#endif
#if !defined ZMQ_FINAL
#if defined ZMQ_HAVE_NOEXCEPT
#define ZMQ_FINAL final
#else
#define ZMQ_FINAL
#endif
#endif
#if !defined ZMQ_DEFAULT
#if defined ZMQ_HAVE_NOEXCEPT
#define ZMQ_DEFAULT = default;

View File

@@ -42,15 +42,15 @@
namespace zmq
{
class mailbox_t : public i_mailbox
class mailbox_t ZMQ_FINAL : public i_mailbox
{
public:
mailbox_t ();
~mailbox_t ();
~mailbox_t () ZMQ_FINAL;
fd_t get_fd () const;
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);
void send (const command_t &cmd_) ZMQ_FINAL;
int recv (command_t *cmd_, int timeout_) ZMQ_FINAL;
bool valid () const;
@@ -58,7 +58,7 @@ class mailbox_t : public i_mailbox
// close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere
// with the context in the parent process.
void forked () { _signaler.forked (); }
void forked () ZMQ_FINAL { _signaler.forked (); }
#endif
private:

View File

@@ -62,7 +62,7 @@ void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
{
// TODO: make a copy of array and signal outside the lock
const std::vector<zmq::signaler_t *>::iterator end = _signalers.end ();
std::vector<signaler_t *>::iterator it =
const std::vector<signaler_t *>::iterator it =
std::find (_signalers.begin (), end, signaler_);
if (it != end)
@@ -106,7 +106,7 @@ int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
_sync->lock ();
} else {
// Wait for signal from the command sender.
int rc = _cond_var.wait (_sync, timeout_);
const int rc = _cond_var.wait (_sync, timeout_);
if (rc == -1) {
errno_assert (errno == EAGAIN || errno == EINTR);
return -1;

View File

@@ -44,14 +44,14 @@
namespace zmq
{
class mailbox_safe_t : public i_mailbox
class mailbox_safe_t ZMQ_FINAL : public i_mailbox
{
public:
mailbox_safe_t (mutex_t *sync_);
~mailbox_safe_t ();
~mailbox_safe_t () ZMQ_FINAL;
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);
void send (const command_t &cmd_) ZMQ_FINAL;
int recv (command_t *cmd_, int timeout_) ZMQ_FINAL;
// Add signaler to mailbox which will be called when a message is ready
void add_signaler (signaler_t *signaler_);
@@ -62,7 +62,7 @@ class mailbox_safe_t : public i_mailbox
// close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere
// with the context in the parent process.
void forked ()
void forked () ZMQ_FINAL
{
// TODO: call fork on the condition variable
}

View File

@@ -95,7 +95,7 @@ const char socket_type_scatter[] = "SCATTER";
const char socket_type_dgram[] = "DGRAM";
#endif
const char *zmq::mechanism_t::socket_type_string (int socket_type_) const
const char *zmq::mechanism_t::socket_type_string (int socket_type_)
{
// TODO the order must of the names must correspond to the values resp. order of ZMQ_* socket type definitions in zmq.h!
static const char *names[] = {
@@ -110,7 +110,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type_) const
#endif
};
static const size_t names_count = sizeof (names) / sizeof (names[0]);
zmq_assert (socket_type_ >= 0 && socket_type_ < (int) names_count);
zmq_assert (socket_type_ >= 0
&& socket_type_ < static_cast<int> (names_count));
return names[socket_type_];
}

View File

@@ -81,17 +81,20 @@ class mechanism_t
const blob_t &get_user_id () const;
const metadata_t::dict_t &get_zmtp_properties ()
const metadata_t::dict_t &get_zmtp_properties () const
{
return _zmtp_properties;
}
const metadata_t::dict_t &get_zap_properties () { return _zap_properties; }
const metadata_t::dict_t &get_zap_properties () const
{
return _zap_properties;
}
protected:
// Only used to identify the socket for the Socket-Type
// property in the wire protocol.
const char *socket_type_string (int socket_type_) const;
static const char *socket_type_string (int socket_type_);
static size_t add_property (unsigned char *ptr_,
size_t ptr_capacity_,

View File

@@ -39,7 +39,7 @@ zmq::mechanism_base_t::mechanism_base_t (session_base_t *const session_,
{
}
int zmq::mechanism_base_t::check_basic_command_structure (msg_t *msg_)
int zmq::mechanism_base_t::check_basic_command_structure (msg_t *msg_) const
{
if (msg_->size () <= 1
|| msg_->size () <= (static_cast<uint8_t *> (msg_->data ()))[0]) {

View File

@@ -44,7 +44,7 @@ class mechanism_base_t : public mechanism_t
session_base_t *const session;
int check_basic_command_structure (msg_t *msg_);
int check_basic_command_structure (msg_t *msg_) const;
void handle_error_reason (const char *error_reason_,
size_t error_reason_len_);

View File

@@ -36,7 +36,7 @@ zmq::metadata_t::metadata_t (const dict_t &dict_) : _ref_cnt (1), _dict (dict_)
const char *zmq::metadata_t::get (const std::string &property_) const
{
dict_t::const_iterator it = _dict.find (property_);
const dict_t::const_iterator it = _dict.find (property_);
if (it == _dict.end ()) {
/** \todo remove this when support for the deprecated name "Identity" is dropped */
if (property_ == "Identity")

View File

@@ -294,7 +294,7 @@ int zmq::msg_t::copy (msg_t &src_)
return -1;
}
int rc = close ();
const int rc = close ();
if (unlikely (rc < 0))
return rc;
@@ -553,7 +553,7 @@ bool zmq::msg_t::rm_refs (int refs_)
return true;
}
uint32_t zmq::msg_t::get_routing_id ()
uint32_t zmq::msg_t::get_routing_id () const
{
return _u.base.routing_id;
}
@@ -574,7 +574,7 @@ int zmq::msg_t::reset_routing_id ()
return 0;
}
const char *zmq::msg_t::group ()
const char *zmq::msg_t::group () const
{
return _u.base.group;
}

View File

@@ -146,10 +146,10 @@ class msg_t
bool is_cmsg () const;
bool is_lmsg () const;
bool is_zcmsg () const;
uint32_t get_routing_id ();
uint32_t get_routing_id () const;
int set_routing_id (uint32_t routing_id_);
int reset_routing_id ();
const char *group ();
const char *group () const;
int set_group (const char *group_);
int set_group (const char *, size_t length_);
@@ -289,7 +289,7 @@ class msg_t
inline int close_and_return (zmq::msg_t *msg_, int echo_)
{
// Since we abort on close failure we preserve errno for success case.
int err = errno;
const int err = errno;
const int rc = msg_->close ();
errno_assert (rc == 0);
errno = err;

View File

@@ -18,11 +18,11 @@ class io_thread_t;
class msg_t;
class session_base_t;
class norm_engine_t : public io_object_t, public i_engine
class norm_engine_t ZMQ_FINAL : public io_object_t, public i_engine
{
public:
norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
~norm_engine_t ();
~norm_engine_t () ZMQ_FINAL;
// create NORM instance, session, etc
int init (const char *network_, bool send, bool recv);
@@ -30,24 +30,24 @@ class norm_engine_t : public io_object_t, public i_engine
// i_engine interface implementation.
// Plug the engine to the session.
virtual void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_);
void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_) ZMQ_FINAL;
// Terminate and deallocate the engine. Note that 'detached'
// events are not fired on termination.
virtual void terminate ();
void terminate () ZMQ_FINAL;
// This method is called by the session to signalise that more
// messages can be written to the pipe.
virtual bool restart_input ();
bool restart_input () ZMQ_FINAL;
// This method is called by the session to signalise that there
// are messages to send available.
virtual void restart_output ();
void restart_output () ZMQ_FINAL;
virtual void zap_msg_available (){};
void zap_msg_available () ZMQ_FINAL {}
virtual const endpoint_uri_pair_t &get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const ZMQ_FINAL;
// i_poll_events interface implementation.
// (we only need in_event() for NormEvent notification)

View File

@@ -39,19 +39,19 @@ namespace zmq
class msg_t;
class session_base_t;
class null_mechanism_t : public zap_client_t
class null_mechanism_t ZMQ_FINAL : public zap_client_t
{
public:
null_mechanism_t (session_base_t *session_,
const std::string &peer_address_,
const options_t &options_);
virtual ~null_mechanism_t ();
~null_mechanism_t () ZMQ_FINAL;
// mechanism implementation
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual int zap_msg_available ();
virtual status_t status () const;
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
int zap_msg_available () ZMQ_FINAL;
status_t status () const ZMQ_FINAL;
private:
bool _ready_command_sent;

View File

@@ -53,7 +53,7 @@ zmq::object_t::~object_t ()
{
}
uint32_t zmq::object_t::get_tid ()
uint32_t zmq::object_t::get_tid () const
{
return _tid;
}
@@ -63,12 +63,12 @@ void zmq::object_t::set_tid (uint32_t id_)
_tid = id_;
}
zmq::ctx_t *zmq::object_t::get_ctx ()
zmq::ctx_t *zmq::object_t::get_ctx () const
{
return _ctx;
}
void zmq::object_t::process_command (command_t &cmd_)
void zmq::object_t::process_command (const command_t &cmd_)
{
switch (cmd_.type) {
case command_t::activate_read:
@@ -184,7 +184,7 @@ void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
return _ctx->unregister_endpoints (socket_);
}
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) const
{
return _ctx->find_endpoint (addr_);
}
@@ -207,7 +207,7 @@ void zmq::object_t::destroy_socket (socket_base_t *socket_)
_ctx->destroy_socket (socket_);
}
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) const
{
return _ctx->choose_io_thread (affinity_);
}
@@ -528,7 +528,7 @@ void zmq::object_t::process_seqnum ()
zmq_assert (false);
}
void zmq::object_t::send_command (command_t &cmd_)
void zmq::object_t::send_command (const command_t &cmd_)
{
_ctx->send_command (cmd_.destination->get_tid (), cmd_);
}

View File

@@ -59,10 +59,10 @@ class object_t
object_t (object_t *parent_);
virtual ~object_t ();
uint32_t get_tid ();
uint32_t get_tid () const;
void set_tid (uint32_t id_);
ctx_t *get_ctx ();
void process_command (zmq::command_t &cmd_);
ctx_t *get_ctx () const;
void process_command (const zmq::command_t &cmd_);
void send_inproc_connected (zmq::socket_base_t *socket_);
void send_bind (zmq::own_t *destination_,
zmq::pipe_t *pipe_,
@@ -74,7 +74,7 @@ class object_t
int register_endpoint (const char *addr_, const zmq::endpoint_t &endpoint_);
int unregister_endpoint (const std::string &addr_, socket_base_t *socket_);
void unregister_endpoints (zmq::socket_base_t *socket_);
zmq::endpoint_t find_endpoint (const char *addr_);
zmq::endpoint_t find_endpoint (const char *addr_) const;
void pend_connection (const std::string &addr_,
const endpoint_t &endpoint_,
pipe_t **pipes_);
@@ -86,7 +86,7 @@ class object_t
void log (const char *format_, ...);
// Chooses least loaded I/O thread.
zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
zmq::io_thread_t *choose_io_thread (uint64_t affinity_) const;
// Derived object can use these functions to send commands
// to other objects.
@@ -157,7 +157,7 @@ class object_t
// Thread ID of the thread the object belongs to.
uint32_t _tid;
void send_command (command_t &cmd_);
void send_command (const command_t &cmd_);
ZMQ_NON_COPYABLE_NOR_MOVABLE (object_t)
};

View File

@@ -302,7 +302,7 @@ int zmq::options_t::setsockopt (int option_,
const void *optval_,
size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
const bool is_int = (optvallen_ == sizeof (int));
int value = 0;
if (is_int)
memcpy (&value, optval_, sizeof (int));
@@ -448,7 +448,7 @@ int zmq::options_t::setsockopt (int option_,
/* Deprecated in favor of ZMQ_IPV6 */
case ZMQ_IPV4ONLY: {
bool value;
int rc =
const int rc =
do_setsockopt_int_as_bool_strict (optval_, optvallen_, &value);
if (rc == 0)
ipv6 = !value;

View File

@@ -147,7 +147,7 @@ void zmq::own_t::terminate ()
send_term_req (_owner, this);
}
bool zmq::own_t::is_terminating ()
bool zmq::own_t::is_terminating () const
{
return _terminating;
}

View File

@@ -84,18 +84,18 @@ class own_t : public object_t
void terminate ();
// Returns true if the object is in process of termination.
bool is_terminating ();
bool is_terminating () const;
// Derived object destroys own_t. There's no point in allowing
// others to invoke the destructor. At the same time, it has to be
// virtual so that generic own_t deallocation mechanism destroys
// specific type of the owned object correctly.
virtual ~own_t ();
~own_t () ZMQ_OVERRIDE;
// Term handler is protected rather than private so that it can
// be intercepted by the derived class. This is useful to add custom
// steps to the beginning of the termination process.
void process_term (int linger_);
void process_term (int linger_) ZMQ_OVERRIDE;
// A place to hook in when physical destruction of the object
// is to be delayed.
@@ -109,10 +109,10 @@ class own_t : public object_t
void set_owner (own_t *owner_);
// Handlers for incoming commands.
void process_own (own_t *object_);
void process_term_req (own_t *object_);
void process_term_ack ();
void process_seqnum ();
void process_own (own_t *object_) ZMQ_OVERRIDE;
void process_term_req (own_t *object_) ZMQ_OVERRIDE;
void process_term_ack () ZMQ_OVERRIDE;
void process_seqnum () ZMQ_OVERRIDE;
// Check whether all the pending term acks were delivered.
// If so, deallocate this object.

View File

@@ -97,7 +97,7 @@ int zmq::pair_t::xsend (msg_t *msg_)
_pipe->flush ();
// Detach the original message from the data buffer.
int rc = msg_->init ();
const int rc = msg_->init ();
errno_assert (rc == 0);
return 0;

View File

@@ -41,23 +41,23 @@ class msg_t;
class pipe_t;
class io_thread_t;
class pair_t : public socket_base_t
class pair_t ZMQ_FINAL : public socket_base_t
{
public:
pair_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~pair_t ();
~pair_t () ZMQ_FINAL;
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
zmq::pipe_t *_pipe;

View File

@@ -46,7 +46,7 @@ namespace zmq
class io_thread_t;
class session_base_t;
class pgm_receiver_t : public io_object_t, public i_engine
class pgm_receiver_t ZMQ_FINAL : public io_object_t, public i_engine
{
public:
pgm_receiver_t (zmq::io_thread_t *parent_, const options_t &options_);

View File

@@ -45,7 +45,7 @@ namespace zmq
class io_thread_t;
class session_base_t;
class pgm_sender_t : public io_object_t, public i_engine
class pgm_sender_t ZMQ_FINAL : public io_object_t, public i_engine
{
public:
pgm_sender_t (zmq::io_thread_t *parent_, const options_t &options_);

View File

@@ -38,10 +38,10 @@
#include "ypipe.hpp"
#include "ypipe_conflate.hpp"
int zmq::pipepair (class object_t *parents_[2],
class pipe_t *pipes_[2],
int hwms_[2],
bool conflate_[2])
int zmq::pipepair (object_t *parents_[2],
pipe_t *pipes_[2],
const int hwms_[2],
const bool conflate_[2])
{
// Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction.
@@ -188,7 +188,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
if (unlikely (_state != active && _state != waiting_for_delimiter))
return false;
for (bool payload_read = false; !payload_read;) {
while (true) {
if (!_in_pipe->read (msg_)) {
_in_active = false;
return false;
@@ -198,8 +198,9 @@ bool zmq::pipe_t::read (msg_t *msg_)
if (unlikely (msg_->is_credential ())) {
const int rc = msg_->close ();
zmq_assert (rc == 0);
} else
payload_read = true;
} else {
break;
}
}
// If delimiter was read, start termination process of the pipe.
@@ -232,7 +233,7 @@ bool zmq::pipe_t::check_write ()
return true;
}
bool zmq::pipe_t::write (msg_t *msg_)
bool zmq::pipe_t::write (const msg_t *msg_)
{
if (unlikely (!check_write ()))
return false;

View File

@@ -54,8 +54,8 @@ class pipe_t;
// read (older messages are discarded)
int pipepair (zmq::object_t *parents_[2],
zmq::pipe_t *pipes_[2],
int hwms_[2],
bool conflate_[2]);
const int hwms_[2],
const bool conflate_[2]);
struct i_pipe_events
{
@@ -71,16 +71,16 @@ struct i_pipe_events
// The array of inbound pipes (1), the array of outbound pipes (2) and
// the generic array of pipes to be deallocated (3).
class pipe_t : public object_t,
public array_item_t<1>,
public array_item_t<2>,
public array_item_t<3>
class pipe_t ZMQ_FINAL : public object_t,
public array_item_t<1>,
public array_item_t<2>,
public array_item_t<3>
{
// This allows pipepair to create pipe objects.
friend int pipepair (zmq::object_t *parents_[2],
zmq::pipe_t *pipes_[2],
int hwms_[2],
bool conflate_[2]);
const int hwms_[2],
const bool conflate_[2]);
public:
// Specifies the object to send events to.
@@ -108,7 +108,7 @@ class pipe_t : public object_t,
// Writes a message to the underlying pipe. Returns false if the
// message does not pass check_write. If false, the message object
// retains ownership of its message buffer.
bool write (msg_t *msg_);
bool write (const msg_t *msg_);
// Remove unfinished parts of the outbound message from the pipe.
void rollback () const;
@@ -152,15 +152,16 @@ class pipe_t : public object_t,
typedef ypipe_base_t<msg_t> upipe_t;
// Command handlers.
void process_activate_read ();
void process_activate_write (uint64_t msgs_read_);
void process_hiccup (void *pipe_);
void process_pipe_peer_stats (uint64_t queue_count_,
own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_);
void process_pipe_term ();
void process_pipe_term_ack ();
void process_pipe_hwm (int inhwm_, int outhwm_);
void process_activate_read () ZMQ_OVERRIDE;
void process_activate_write (uint64_t msgs_read_) ZMQ_OVERRIDE;
void process_hiccup (void *pipe_) ZMQ_OVERRIDE;
void
process_pipe_peer_stats (uint64_t queue_count_,
own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_) ZMQ_OVERRIDE;
void process_pipe_term () ZMQ_OVERRIDE;
void process_pipe_term_ack () ZMQ_OVERRIDE;
void process_pipe_hwm (int inhwm_, int outhwm_) ZMQ_OVERRIDE;
// Handler for delimiter read from the pipe.
void process_delimiter ();
@@ -179,7 +180,7 @@ class pipe_t : public object_t,
void set_peer (pipe_t *peer_);
// Destructor is private. Pipe objects destroy themselves.
~pipe_t ();
~pipe_t () ZMQ_OVERRIDE;
// Underlying pipes for both directions.
upipe_t *_in_pipe;

View File

@@ -37,16 +37,16 @@ namespace zmq
{
class msg_t;
class plain_client_t : public mechanism_base_t
class plain_client_t ZMQ_FINAL : public mechanism_base_t
{
public:
plain_client_t (session_base_t *const session_, const options_t &options_);
virtual ~plain_client_t ();
~plain_client_t () ZMQ_FINAL;
// mechanism implementation
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual status_t status () const;
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
status_t status () const ZMQ_FINAL;
private:
enum state_t

View File

@@ -192,7 +192,7 @@ int zmq::plain_server_t::process_hello (msg_t *msg_)
return receive_and_process_zap_reply () == -1 ? -1 : 0;
}
void zmq::plain_server_t::produce_welcome (msg_t *msg_) const
void zmq::plain_server_t::produce_welcome (msg_t *msg_)
{
const int rc = msg_->init_size (welcome_prefix_len);
errno_assert (rc == 0);

View File

@@ -38,20 +38,20 @@ namespace zmq
class msg_t;
class session_base_t;
class plain_server_t : public zap_client_common_handshake_t
class plain_server_t ZMQ_FINAL : public zap_client_common_handshake_t
{
public:
plain_server_t (session_base_t *session_,
const std::string &peer_address_,
const options_t &options_);
virtual ~plain_server_t ();
~plain_server_t () ZMQ_FINAL;
// mechanism implementation
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
int next_handshake_command (msg_t *msg_) ZMQ_FINAL;
int process_handshake_command (msg_t *msg_) ZMQ_FINAL;
private:
void produce_welcome (msg_t *msg_) const;
static void produce_welcome (msg_t *msg_);
void produce_ready (msg_t *msg_) const;
void produce_error (msg_t *msg_) const;

View File

@@ -56,7 +56,7 @@ struct i_poll_events;
// Implements socket polling mechanism using the POSIX.1-2001
// poll() system call.
class poll_t : public worker_poller_base_t
class poll_t ZMQ_FINAL : public worker_poller_base_t
{
public:
typedef fd_t handle_t;
@@ -78,7 +78,7 @@ class poll_t : public worker_poller_base_t
private:
// Main event loop.
virtual void loop ();
void loop () ZMQ_FINAL;
void cleanup_retired ();

View File

@@ -32,10 +32,6 @@
#include "i_poll_events.hpp"
#include "err.hpp"
zmq::poller_base_t::poller_base_t ()
{
}
zmq::poller_base_t::~poller_base_t ()
{
// Make sure there is no more load on the shutdown.
@@ -127,7 +123,7 @@ void zmq::worker_poller_base_t::start (const char *name_)
_ctx.start_thread (_worker, worker_routine, this, name_);
}
void zmq::worker_poller_base_t::check_thread ()
void zmq::worker_poller_base_t::check_thread () const
{
#ifdef _DEBUG
zmq_assert (!_worker.get_started () || _worker.is_current_thread ());

View File

@@ -114,7 +114,7 @@ struct i_poll_events;
// a container that is being iterated by the poller.
// A class that can be used as a base class for implementations of the poller
// A class that can be used as abase class for implementations of the poller
// concept.
//
// For documentation of the public methods, see the description of the poller_t
@@ -122,7 +122,7 @@ struct i_poll_events;
class poller_base_t
{
public:
poller_base_t ();
poller_base_t () ZMQ_DEFAULT;
virtual ~poller_base_t ();
// Methods from the poller concept.
@@ -172,7 +172,7 @@ class worker_poller_base_t : public poller_base_t
// via an assertion.
// Should be called by the add_fd, removed_fd, set_*, reset_* functions
// to ensure correct usage.
void check_thread ();
void check_thread () const;
// Stops the worker thread. Should be called from the destructor of the
// leaf class.

View File

@@ -50,13 +50,13 @@ struct i_poll_events;
// This class implements socket polling mechanism using the AIX-specific
// pollset mechanism.
class pollset_t : public poller_base_t
class pollset_t ZMQ_FINAL : public poller_base_t
{
public:
typedef void *handle_t;
pollset_t (const thread_ctx_t &ctx_);
~pollset_t ();
~pollset_t () ZMQ_FINAL;
// "poller" concept.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
@@ -75,7 +75,7 @@ class pollset_t : public poller_base_t
static void worker_routine (void *arg_);
// Main event loop.
void loop ();
void loop () ZMQ_FINAL;
// Reference to ZMQ context.
const thread_ctx_t &ctx;

View File

@@ -183,9 +183,9 @@ static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
return rc;
}
static int reply_stats (class zmq::socket_base_t *control_,
zmq_socket_stats_t *frontend_stats_,
zmq_socket_stats_t *backend_stats_)
static int reply_stats (zmq::socket_base_t *control_,
const zmq_socket_stats_t *frontend_stats_,
const zmq_socket_stats_t *backend_stats_)
{
// first part: frontend stats - the first send might fail due to HWM
if (loop_and_send_multipart_stat (control_, frontend_stats_->msg_in, true,

View File

@@ -39,18 +39,18 @@ class io_thread_t;
class socket_base_t;
class msg_t;
class pub_t : public xpub_t
class pub_t ZMQ_FINAL : public xpub_t
{
public:
pub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~pub_t ();
~pub_t () ZMQ_FINAL;
// Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool locally_initiated_ = false) ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
ZMQ_NON_COPYABLE_NOR_MOVABLE (pub_t)
};

View File

@@ -41,21 +41,21 @@ class pipe_t;
class msg_t;
class io_thread_t;
class pull_t : public socket_base_t
class pull_t ZMQ_FINAL : public socket_base_t
{
public:
pull_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~pull_t ();
~pull_t () ZMQ_FINAL;
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
void xread_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_) ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
// Fair queueing object for inbound pipes.

View File

@@ -41,21 +41,21 @@ class pipe_t;
class msg_t;
class io_thread_t;
class push_t : public socket_base_t
class push_t ZMQ_FINAL : public socket_base_t
{
public:
push_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~push_t ();
~push_t () ZMQ_FINAL;
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
// Load balancer managing the outbound pipes.

View File

@@ -157,8 +157,8 @@ int zmq::radio_t::xsend (msg_t *msg_)
_dist.unmatch ();
std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
_subscriptions.equal_range (std::string (msg_->group ()));
const std::pair<subscriptions_t::iterator, subscriptions_t::iterator>
range = _subscriptions.equal_range (std::string (msg_->group ()));
for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
_dist.match (it->second);
@@ -262,7 +262,7 @@ int zmq::radio_session_t::pull_msg (msg_t *msg_)
return rc;
const char *group = _pending_msg.group ();
int length = static_cast<int> (strlen (group));
const int length = static_cast<int> (strlen (group));
// First frame is the group
rc = msg_->init_size (length);

View File

@@ -45,24 +45,25 @@ class ctx_t;
class pipe_t;
class io_thread_t;
class radio_t : public socket_base_t
class radio_t ZMQ_FINAL : public socket_base_t
{
public:
radio_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~radio_t ();
~radio_t () ZMQ_FINAL;
// Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_ = false) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
int
xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
// List of all subscriptions mapped to corresponding pipes.
@@ -82,7 +83,7 @@ class radio_t : public socket_base_t
ZMQ_NON_COPYABLE_NOR_MOVABLE (radio_t)
};
class radio_session_t : public session_base_t
class radio_session_t ZMQ_FINAL : public session_base_t
{
public:
radio_session_t (zmq::io_thread_t *io_thread_,
@@ -90,12 +91,12 @@ class radio_session_t : public session_base_t
zmq::socket_base_t *socket_,
const options_t &options_,
address_t *addr_);
~radio_session_t ();
~radio_session_t () ZMQ_FINAL;
// Overrides of the functions from session_base_t.
int push_msg (msg_t *msg_);
int pull_msg (msg_t *msg_);
void reset ();
int push_msg (msg_t *msg_) ZMQ_FINAL;
int pull_msg (msg_t *msg_) ZMQ_FINAL;
void reset () ZMQ_FINAL;
private:
enum

View File

@@ -34,6 +34,7 @@
#include <stdlib.h>
#include <string.h>
#include <iterator>
#include <vector>
node_t::node_t (unsigned char *data_) : _data (data_)
@@ -154,8 +155,8 @@ bool node_t::operator!= (node_t other_) const
void node_t::resize (size_t prefix_length_, size_t edgecount_)
{
size_t node_size = 3 * sizeof (uint32_t) + prefix_length_
+ edgecount_ * (1 + sizeof (void *));
const size_t node_size = 3 * sizeof (uint32_t) + prefix_length_
+ edgecount_ * (1 + sizeof (void *));
unsigned char *new_data =
static_cast<unsigned char *> (realloc (_data, node_size));
zmq_assert (new_data);
@@ -166,8 +167,8 @@ void node_t::resize (size_t prefix_length_, size_t edgecount_)
node_t make_node (size_t refcount_, size_t prefix_length_, size_t edgecount_)
{
size_t node_size = 3 * sizeof (uint32_t) + prefix_length_
+ edgecount_ * (1 + sizeof (void *));
const size_t node_size = 3 * sizeof (uint32_t) + prefix_length_
+ edgecount_ * (1 + sizeof (void *));
unsigned char *data = static_cast<unsigned char *> (malloc (node_size));
zmq_assert (data);
@@ -187,7 +188,7 @@ zmq::radix_tree_t::radix_tree_t () : _root (make_node (0, 0, 0)), _size (0)
static void free_nodes (node_t node_)
{
for (size_t i = 0; i < node_.edgecount (); ++i)
for (size_t i = 0, count = node_.edgecount (); i < count; ++i)
free_nodes (node_.node_at (i));
free (node_._data);
}
@@ -234,18 +235,19 @@ match_result_t zmq::radix_tree_t::match (const unsigned char *key_,
size_t parent_edge_index = 0;
while (current_node.prefix_length () > 0 || current_node.edgecount () > 0) {
const unsigned char *const prefix = current_node.prefix ();
const size_t prefix_length = current_node.prefix_length ();
for (prefix_byte_index = 0;
prefix_byte_index < current_node.prefix_length ()
&& key_byte_index < key_size_;
prefix_byte_index < prefix_length && key_byte_index < key_size_;
++prefix_byte_index, ++key_byte_index) {
if (current_node.prefix ()[prefix_byte_index]
!= key_[key_byte_index])
if (prefix[prefix_byte_index] != key_[key_byte_index])
break;
}
// Even if a prefix of the key matches and we're doing a
// lookup, this means we've found a matching subscription.
if (is_lookup_ && prefix_byte_index == current_node.prefix_length ()
if (is_lookup_ && prefix_byte_index == prefix_length
&& current_node.refcount () > 0) {
key_byte_index = key_size_;
break;
@@ -253,14 +255,14 @@ match_result_t zmq::radix_tree_t::match (const unsigned char *key_,
// There was a mismatch or we've matched the whole key, so
// there's nothing more to do.
if (prefix_byte_index != current_node.prefix_length ()
|| key_byte_index == key_size_)
if (prefix_byte_index != prefix_length || key_byte_index == key_size_)
break;
// We need to match the rest of the key. Check if there's an
// outgoing edge from this node.
node_t next_node = current_node;
for (size_t i = 0; i < current_node.edgecount (); ++i) {
for (size_t i = 0, edgecount = current_node.edgecount (); i < edgecount;
++i) {
if (current_node.first_byte_at (i) == key_[key_byte_index]) {
parent_edge_index = edge_index;
edge_index = i;
@@ -283,10 +285,10 @@ match_result_t zmq::radix_tree_t::match (const unsigned char *key_,
bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
{
match_result_t match_result = match (key_, key_size_);
size_t key_bytes_matched = match_result._key_bytes_matched;
size_t prefix_bytes_matched = match_result._prefix_bytes_matched;
size_t edge_index = match_result._edge_index;
const match_result_t match_result = match (key_, key_size_);
const size_t key_bytes_matched = match_result._key_bytes_matched;
const size_t prefix_bytes_matched = match_result._prefix_bytes_matched;
const size_t edge_index = match_result._edge_index;
node_t current_node = match_result._current_node;
node_t parent_node = match_result._parent_node;
@@ -407,11 +409,11 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
{
match_result_t match_result = match (key_, key_size_);
size_t key_bytes_matched = match_result._key_bytes_matched;
size_t prefix_bytes_matched = match_result._prefix_bytes_matched;
size_t edge_index = match_result._edge_index;
size_t parent_edge_index = match_result._parent_edge_index;
const match_result_t match_result = match (key_, key_size_);
const size_t key_bytes_matched = match_result._key_bytes_matched;
const size_t prefix_bytes_matched = match_result._prefix_bytes_matched;
const size_t edge_index = match_result._edge_index;
const size_t parent_edge_index = match_result._parent_edge_index;
node_t current_node = match_result._current_node;
node_t parent_node = match_result._parent_node;
node_t grandparent_node = match_result._grandparent_node;
@@ -430,7 +432,7 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
if (current_node == _root)
return true;
size_t outgoing_edges = current_node.edgecount ();
const size_t outgoing_edges = current_node.edgecount ();
if (outgoing_edges > 1)
// This node can't be merged with any other node, so there's
// nothing more to do.
@@ -443,7 +445,7 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
// Make room for the child node's prefix and edges. We need to
// keep the old prefix length since resize() will overwrite
// it.
uint32_t old_prefix_length = current_node.prefix_length ();
const uint32_t old_prefix_length = current_node.prefix_length ();
current_node.resize (old_prefix_length + child.prefix_length (),
child.edgecount ());
@@ -472,7 +474,7 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
// Make room for the child node's prefix and edges. We need to
// keep the old prefix length since resize() will overwrite
// it.
uint32_t old_prefix_length = parent_node.prefix_length ();
const uint32_t old_prefix_length = parent_node.prefix_length ();
parent_node.resize (old_prefix_length + other_child.prefix_length (),
other_child.edgecount ());
@@ -499,9 +501,9 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
// Replace the edge to the current node with the last edge. An
// edge consists of a byte and a pointer to the next node. First
// replace the byte.
size_t last_index = parent_node.edgecount () - 1;
unsigned char last_byte = parent_node.first_byte_at (last_index);
node_t last_node = parent_node.node_at (last_index);
const size_t last_index = parent_node.edgecount () - 1;
const unsigned char last_byte = parent_node.first_byte_at (last_index);
const node_t last_node = parent_node.node_at (last_index);
parent_node.set_edge_at (edge_index, last_byte, last_node);
// Move the chunk of pointers one byte to the left, effectively
@@ -543,18 +545,20 @@ visit_keys (node_t node_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_)
{
for (size_t i = 0; i < node_.prefix_length (); ++i)
buffer_.push_back (node_.prefix ()[i]);
const size_t prefix_length = node_.prefix_length ();
buffer_.reserve (buffer_.size () + prefix_length);
std::copy (node_.prefix (), node_.prefix () + prefix_length,
std::back_inserter (buffer_));
if (node_.refcount () > 0) {
zmq_assert (!buffer_.empty ());
func_ (&buffer_[0], buffer_.size (), arg_);
}
for (size_t i = 0; i < node_.edgecount (); ++i)
for (size_t i = 0, edgecount = node_.edgecount (); i < edgecount; ++i) {
visit_keys (node_.node_at (i), buffer_, func_, arg_);
for (size_t i = 0; i < node_.prefix_length (); ++i)
buffer_.pop_back ();
}
buffer_.resize (buffer_.size () - prefix_length);
}
void zmq::radix_tree_t::apply (

View File

@@ -67,24 +67,23 @@ struct node_t
bool operator== (node_t other_) const;
bool operator!= (node_t other_) const;
inline uint32_t refcount ();
inline uint32_t prefix_length ();
inline uint32_t edgecount ();
inline unsigned char *prefix ();
inline unsigned char *first_bytes ();
inline unsigned char first_byte_at (size_t index_);
inline unsigned char *node_pointers ();
inline node_t node_at (size_t index_);
inline void set_refcount (uint32_t value_);
inline void set_prefix_length (uint32_t value_);
inline void set_edgecount (uint32_t value_);
inline void set_prefix (const unsigned char *bytes_);
inline void set_first_bytes (const unsigned char *bytes_);
inline void set_first_byte_at (size_t index_, unsigned char byte_);
inline void set_node_pointers (const unsigned char *pointers_);
inline void set_node_at (size_t index_, node_t node_);
inline void
set_edge_at (size_t index_, unsigned char first_byte_, node_t node_);
uint32_t refcount ();
uint32_t prefix_length ();
uint32_t edgecount ();
unsigned char *prefix ();
unsigned char *first_bytes ();
unsigned char first_byte_at (size_t index_);
unsigned char *node_pointers ();
node_t node_at (size_t index_);
void set_refcount (uint32_t value_);
void set_prefix_length (uint32_t value_);
void set_edgecount (uint32_t value_);
void set_prefix (const unsigned char *bytes_);
void set_first_bytes (const unsigned char *bytes_);
void set_first_byte_at (size_t index_, unsigned char byte_);
void set_node_pointers (const unsigned char *pointers_);
void set_node_at (size_t index_, node_t node_);
void set_edge_at (size_t index_, unsigned char first_byte_, node_t node_);
void resize (size_t prefix_length_, size_t edgecount_);
unsigned char *_data;
@@ -137,7 +136,7 @@ class radix_tree_t
size_t size () const;
private:
inline match_result_t
match_result_t
match (const unsigned char *key_, size_t key_size_, bool is_lookup_) const;
node_t _root;

View File

@@ -49,7 +49,7 @@
void zmq::seed_random ()
{
#if defined ZMQ_HAVE_WINDOWS
int pid = static_cast<int> (GetCurrentProcessId ());
const int pid = static_cast<int> (GetCurrentProcessId ());
#else
int pid = static_cast<int> (getpid ());
#endif
@@ -59,7 +59,7 @@ void zmq::seed_random ()
uint32_t zmq::generate_random ()
{
// Compensate for the fact that rand() returns signed integer.
uint32_t low = static_cast<uint32_t> (rand ());
const uint32_t low = static_cast<uint32_t> (rand ());
uint32_t high = static_cast<uint32_t> (rand ());
high <<= (sizeof (int) * 8 - 1);
return high | low;

View File

@@ -39,22 +39,23 @@ namespace zmq
{
// Decoder for 0MQ v1 framing protocol. Converts data stream into messages.
class raw_decoder_t : public i_decoder
class raw_decoder_t ZMQ_FINAL : public i_decoder
{
public:
raw_decoder_t (size_t bufsize_);
virtual ~raw_decoder_t ();
~raw_decoder_t () ZMQ_FINAL;
// i_decoder interface.
virtual void get_buffer (unsigned char **data_, size_t *size_);
void get_buffer (unsigned char **data_, size_t *size_) ZMQ_FINAL;
virtual int
decode (const unsigned char *data_, size_t size_, size_t &bytes_used_);
int decode (const unsigned char *data_,
size_t size_,
size_t &bytes_used_) ZMQ_FINAL;
virtual msg_t *msg () { return &_in_progress; }
msg_t *msg () ZMQ_FINAL { return &_in_progress; }
virtual void resize_buffer (size_t) {}
void resize_buffer (size_t) ZMQ_FINAL {}
private:
msg_t _in_progress;

View File

@@ -40,11 +40,11 @@ namespace zmq
{
// Encoder for 0MQ framing protocol. Converts messages into data batches.
class raw_encoder_t : public encoder_base_t<raw_encoder_t>
class raw_encoder_t ZMQ_FINAL : public encoder_base_t<raw_encoder_t>
{
public:
raw_encoder_t (size_t bufsize_);
~raw_encoder_t ();
~raw_encoder_t () ZMQ_FINAL;
private:
void raw_message_ready ();

View File

@@ -54,18 +54,18 @@ class mechanism_t;
// This engine handles any socket with SOCK_STREAM semantics,
// e.g. TCP socket or an UNIX domain socket.
class raw_engine_t : public stream_engine_base_t
class raw_engine_t ZMQ_FINAL : public stream_engine_base_t
{
public:
raw_engine_t (fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_);
~raw_engine_t ();
~raw_engine_t () ZMQ_FINAL;
protected:
void error (error_reason_t reason_);
void plug_internal ();
bool handshake ();
void error (error_reason_t reason_) ZMQ_FINAL;
void plug_internal () ZMQ_FINAL;
bool handshake () ZMQ_FINAL;
private:
int push_raw_msg_to_session (msg_t *msg_);

View File

@@ -93,7 +93,7 @@ void zmq::reaper_t::in_event ()
// Get the next command. If there is none, exit.
command_t cmd;
int rc = _mailbox.recv (&cmd, 0);
const int rc = _mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)

View File

@@ -40,11 +40,11 @@ namespace zmq
class ctx_t;
class socket_base_t;
class reaper_t : public object_t, public i_poll_events
class reaper_t ZMQ_FINAL : public object_t, public i_poll_events
{
public:
reaper_t (zmq::ctx_t *ctx_, uint32_t tid_);
~reaper_t ();
~reaper_t () ZMQ_FINAL;
mailbox_t *get_mailbox ();
@@ -52,15 +52,15 @@ class reaper_t : public object_t, public i_poll_events
void stop ();
// i_poll_events implementation.
void in_event ();
void out_event ();
void timer_event (int id_);
void in_event () ZMQ_FINAL;
void out_event () ZMQ_FINAL;
void timer_event (int id_) ZMQ_FINAL;
private:
// Command handlers.
void process_stop ();
void process_reap (zmq::socket_base_t *socket_);
void process_reaped ();
void process_stop () ZMQ_FINAL;
void process_reap (zmq::socket_base_t *socket_) ZMQ_FINAL;
void process_reaped () ZMQ_FINAL;
// Reaper thread accesses incoming commands via this mailbox.
mailbox_t _mailbox;

View File

@@ -52,10 +52,10 @@ int zmq::rep_t::xsend (msg_t *msg_)
return -1;
}
bool more = (msg_->flags () & msg_t::more) != 0;
const bool more = (msg_->flags () & msg_t::more) != 0;
// Push message to the reply pipe.
int rc = router_t::xsend (msg_);
const int rc = router_t::xsend (msg_);
if (rc != 0)
return rc;
@@ -84,7 +84,7 @@ int zmq::rep_t::xrecv (msg_t *msg_)
if ((msg_->flags () & msg_t::more)) {
// Empty message part delimits the traceback stack.
bool bottom = (msg_->size () == 0);
const bool bottom = (msg_->size () == 0);
// Push it to the reply pipe.
rc = router_t::xsend (msg_);
@@ -103,7 +103,7 @@ int zmq::rep_t::xrecv (msg_t *msg_)
}
// Get next message part to return to the user.
int rc = router_t::xrecv (msg_);
const int rc = router_t::xrecv (msg_);
if (rc != 0)
return rc;

View File

@@ -39,17 +39,17 @@ class msg_t;
class io_thread_t;
class socket_base_t;
class rep_t : public router_t
class rep_t ZMQ_FINAL : public router_t
{
public:
rep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~rep_t ();
~rep_t () ZMQ_FINAL;
// Overrides of functions from socket_base_t.
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
private:
// If true, we are in process of sending the reply. If false we are

View File

@@ -175,7 +175,7 @@ int zmq::req_t::xrecv (msg_t *msg_)
_message_begins = false;
}
int rc = recv_reply_pipe (msg_);
const int rc = recv_reply_pipe (msg_);
if (rc != 0)
return rc;
@@ -210,7 +210,7 @@ int zmq::req_t::xsetsockopt (int option_,
const void *optval_,
size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
const bool is_int = (optvallen_ == sizeof (int));
int value = 0;
if (is_int)
memcpy (&value, optval_, sizeof (int));
@@ -248,7 +248,7 @@ int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{
while (true) {
pipe_t *pipe = NULL;
int rc = dealer_t::recvpipe (msg_, &pipe);
const int rc = dealer_t::recvpipe (msg_, &pipe);
if (rc != 0)
return rc;
if (!_reply_pipe || pipe == _reply_pipe)

View File

@@ -40,19 +40,20 @@ class msg_t;
class io_thread_t;
class socket_base_t;
class req_t : public dealer_t
class req_t ZMQ_FINAL : public dealer_t
{
public:
req_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~req_t ();
~req_t () ZMQ_FINAL;
// Overrides of functions from socket_base_t.
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xpipe_terminated (zmq::pipe_t *pipe_);
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
int
xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
protected:
// Receive only from the pipe the request was sent to, discarding
@@ -86,7 +87,7 @@ class req_t : public dealer_t
ZMQ_NON_COPYABLE_NOR_MOVABLE (req_t)
};
class req_session_t : public session_base_t
class req_session_t ZMQ_FINAL : public session_base_t
{
public:
req_session_t (zmq::io_thread_t *io_thread_,
@@ -94,11 +95,11 @@ class req_session_t : public session_base_t
zmq::socket_base_t *socket_,
const options_t &options_,
address_t *addr_);
~req_session_t ();
~req_session_t () ZMQ_FINAL;
// Overrides of the functions from session_base_t.
int push_msg (msg_t *msg_);
void reset ();
int push_msg (msg_t *msg_) ZMQ_FINAL;
void reset () ZMQ_FINAL;
private:
enum

View File

@@ -90,7 +90,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_,
errno_assert (rc == 0);
}
bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
const bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
if (routing_id_ok)
_fq.attach (pipe_);
else
@@ -171,7 +171,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
void zmq::router_t::xread_activated (pipe_t *pipe_)
{
std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_);
const std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_);
if (it == _anonymous_pipes.end ())
_fq.activated (pipe_);
else {
@@ -209,7 +209,7 @@ int zmq::router_t::xsend (msg_t *msg_)
// Check whether pipe is closed or not
if (!_current_out->check_write ()) {
// Check whether pipe is full or not
bool pipe_full = !_current_out->check_hwm ();
const bool pipe_full = !_current_out->check_hwm ();
out_pipe->active = false;
_current_out = NULL;
@@ -258,10 +258,10 @@ int zmq::router_t::xsend (msg_t *msg_)
return 0;
}
bool ok = _current_out->write (msg_);
const bool ok = _current_out->write (msg_);
if (unlikely (!ok)) {
// Message failed to send - we must close it ourselves.
int rc = msg_->close ();
const int rc = msg_->close ();
errno_assert (rc == 0);
// HWM was checked before, so the pipe must be gone. Roll back
// messages that were piped, for example REP labels.
@@ -274,12 +274,12 @@ int zmq::router_t::xsend (msg_t *msg_)
}
}
} else {
int rc = msg_->close ();
const int rc = msg_->close ();
errno_assert (rc == 0);
}
// Detach the message from the data buffer.
int rc = msg_->init ();
const int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
@@ -289,11 +289,11 @@ int zmq::router_t::xrecv (msg_t *msg_)
{
if (_prefetched) {
if (!_routing_id_sent) {
int rc = msg_->move (_prefetched_id);
const int rc = msg_->move (_prefetched_id);
errno_assert (rc == 0);
_routing_id_sent = true;
} else {
int rc = msg_->move (_prefetched_msg);
const int rc = msg_->move (_prefetched_msg);
errno_assert (rc == 0);
_prefetched = false;
}
@@ -469,7 +469,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
} else if (!options.raw_socket) {
// Pick up handshake cases and also case where next integral routing id is set
msg.init ();
bool ok = pipe_->read (&msg);
const bool ok = pipe_->read (&msg);
if (!ok)
return false;

View File

@@ -49,20 +49,22 @@ class router_t : public routing_socket_base_t
{
public:
router_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~router_t ();
~router_t () ZMQ_OVERRIDE;
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
int get_peer_state (const void *routing_id_, size_t routing_id_size_) const;
bool locally_initiated_) ZMQ_FINAL;
int
xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_OVERRIDE;
int xrecv (zmq::msg_t *msg_) ZMQ_OVERRIDE;
bool xhas_in () ZMQ_OVERRIDE;
bool xhas_out () ZMQ_OVERRIDE;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
int get_peer_state (const void *routing_id_,
size_t routing_id_size_) const ZMQ_FINAL;
protected:
// Rollback any message parts that were sent but not yet flushed.

View File

@@ -41,21 +41,21 @@ class pipe_t;
class msg_t;
class io_thread_t;
class scatter_t : public socket_base_t
class scatter_t ZMQ_FINAL : public socket_base_t
{
public:
scatter_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~scatter_t ();
~scatter_t () ZMQ_FINAL;
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
// Load balancer managing the outbound pipes.

View File

@@ -57,13 +57,13 @@ struct i_poll_events;
// Implements socket polling mechanism using POSIX.1-2001 select()
// function.
class select_t : public worker_poller_base_t
class select_t ZMQ_FINAL : public worker_poller_base_t
{
public:
typedef fd_t handle_t;
select_t (const thread_ctx_t &ctx_);
~select_t ();
~select_t () ZMQ_FINAL;
// "poller" concept.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
@@ -78,7 +78,7 @@ class select_t : public worker_poller_base_t
private:
// Main event loop.
void loop ();
void loop () ZMQ_FINAL;
// Internal state.
struct fds_set_t

View File

@@ -64,7 +64,8 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_,
pipe_->set_server_socket_routing_id (routing_id);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
bool ok = _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (routing_id, outpipe).second;
const bool ok =
_out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (routing_id, outpipe).second;
zmq_assert (ok);
_fq.attach (pipe_);
@@ -72,7 +73,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_,
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
{
out_pipes_t::iterator it =
const out_pipes_t::iterator it =
_out_pipes.find (pipe_->get_server_socket_routing_id ());
zmq_assert (it != _out_pipes.end ());
_out_pipes.erase (it);
@@ -105,7 +106,7 @@ int zmq::server_t::xsend (msg_t *msg_)
return -1;
}
// Find the pipe associated with the routing stored in the message.
uint32_t routing_id = msg_->get_routing_id ();
const uint32_t routing_id = msg_->get_routing_id ();
out_pipes_t::iterator it = _out_pipes.find (routing_id);
if (it != _out_pipes.end ()) {
@@ -123,7 +124,7 @@ int zmq::server_t::xsend (msg_t *msg_)
int rc = msg_->reset_routing_id ();
errno_assert (rc == 0);
bool ok = it->second.pipe->write (msg_);
const bool ok = it->second.pipe->write (msg_);
if (unlikely (!ok)) {
// Message failed to send - we must close it ourselves.
rc = msg_->close ();
@@ -161,7 +162,7 @@ int zmq::server_t::xrecv (msg_t *msg_)
zmq_assert (pipe != NULL);
uint32_t routing_id = pipe->get_server_socket_routing_id ();
const uint32_t routing_id = pipe->get_server_socket_routing_id ();
msg_->set_routing_id (routing_id);
return 0;

View File

@@ -45,23 +45,23 @@ class msg_t;
class pipe_t;
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class server_t : public socket_base_t
class server_t ZMQ_FINAL : public socket_base_t
{
public:
server_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~server_t ();
~server_t () ZMQ_FINAL;
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
bool locally_initiated_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
bool xhas_in () ZMQ_FINAL;
bool xhas_out () ZMQ_FINAL;
void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
// Fair queueing object for inbound pipes.

View File

@@ -173,7 +173,7 @@ int zmq::session_base_t::push_msg (msg_t *msg_)
&& !msg_->is_cancel ())
return 0;
if (_pipe && _pipe->write (msg_)) {
int rc = msg_->init ();
const int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
@@ -325,7 +325,7 @@ void zmq::session_base_t::hiccuped (pipe_t *)
zmq_assert (false);
}
zmq::socket_base_t *zmq::session_base_t::get_socket ()
zmq::socket_base_t *zmq::session_base_t::get_socket () const
{
return _socket;
}
@@ -385,7 +385,7 @@ int zmq::session_base_t::zap_connect ()
return 0;
}
bool zmq::session_base_t::zap_enabled ()
bool zmq::session_base_t::zap_enabled () const
{
return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
}
@@ -404,7 +404,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
int hwms[2] = {conflate ? -1 : options.rcvhwm,
conflate ? -1 : options.sndhwm};
bool conflates[2] = {conflate, conflate};
int rc = pipepair (parents, pipes, hwms, conflates);
const int rc = pipepair (parents, pipes, hwms, conflates);
errno_assert (rc == 0);
// Plug the local end of the pipe.

Some files were not shown because too many files have changed in this diff Show More