mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-01 10:57:59 +01:00
Problem: ptr/ref parameters and local variables are non-const but never modified
Solution: add const
This commit is contained in:
parent
759fed8e7e
commit
41e3f14d6a
@ -156,7 +156,7 @@ class atomic_counter_t
|
||||
__atomic_sub_fetch (&_value, decrement_, __ATOMIC_ACQ_REL);
|
||||
return nv != 0;
|
||||
#elif defined ZMQ_ATOMIC_COUNTER_CXX11
|
||||
integer_t old =
|
||||
const integer_t old =
|
||||
_value.fetch_sub (decrement_, std::memory_order_acq_rel);
|
||||
return old - decrement_ != 0;
|
||||
#elif defined ZMQ_ATOMIC_COUNTER_ATOMIC_H
|
||||
|
@ -103,7 +103,7 @@ f_compatible_get_tick_count64 init_compatible_get_tick_count64 ()
|
||||
f_compatible_get_tick_count64 func = NULL;
|
||||
#if !defined ZMQ_HAVE_WINDOWS_UWP
|
||||
|
||||
HMODULE module = ::LoadLibraryA ("Kernel32.dll");
|
||||
const HMODULE module = ::LoadLibraryA ("Kernel32.dll");
|
||||
if (module != NULL)
|
||||
func = reinterpret_cast<f_compatible_get_tick_count64> (
|
||||
::GetProcAddress (module, "GetTickCount64"));
|
||||
@ -200,7 +200,7 @@ uint64_t zmq::clock_t::now_us ()
|
||||
|
||||
uint64_t zmq::clock_t::now_ms ()
|
||||
{
|
||||
uint64_t tsc = rdtsc ();
|
||||
const uint64_t tsc = rdtsc ();
|
||||
|
||||
// If TSC is not supported, get precise time and chop off the microseconds.
|
||||
if (!tsc) {
|
||||
|
31
src/ctx.cpp
31
src/ctx.cpp
@ -157,7 +157,7 @@ int zmq::ctx_t::terminate ()
|
||||
{
|
||||
_slot_sync.lock ();
|
||||
|
||||
bool save_terminating = _terminating;
|
||||
const bool save_terminating = _terminating;
|
||||
_terminating = false;
|
||||
|
||||
// Connect up any pending inproc connections, otherwise we will hang
|
||||
@ -187,7 +187,7 @@ int zmq::ctx_t::terminate ()
|
||||
|
||||
// Check whether termination was already underway, but interrupted and now
|
||||
// restarted.
|
||||
bool restarted = _terminating;
|
||||
const bool restarted = _terminating;
|
||||
_terminating = true;
|
||||
|
||||
// First attempt to terminate the context.
|
||||
@ -206,7 +206,7 @@ int zmq::ctx_t::terminate ()
|
||||
|
||||
// Wait till reaper thread closes all the sockets.
|
||||
command_t cmd;
|
||||
int rc = _term_mailbox.recv (&cmd, -1);
|
||||
const int rc = _term_mailbox.recv (&cmd, -1);
|
||||
if (rc == -1 && errno == EINTR)
|
||||
return -1;
|
||||
errno_assert (rc == 0);
|
||||
@ -257,7 +257,7 @@ int zmq::ctx_t::shutdown ()
|
||||
|
||||
int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
|
||||
{
|
||||
bool is_int = (optvallen_ == sizeof (int));
|
||||
const bool is_int = (optvallen_ == sizeof (int));
|
||||
int value = 0;
|
||||
if (is_int)
|
||||
memcpy (&value, optval_, sizeof (int));
|
||||
@ -320,7 +320,7 @@ int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
int zmq::ctx_t::get (int option_, void *optval_, size_t *optvallen_)
|
||||
int zmq::ctx_t::get (int option_, void *optval_, const size_t *optvallen_)
|
||||
{
|
||||
const bool is_int = (*optvallen_ == sizeof (int));
|
||||
int *value = static_cast<int *> (optval_);
|
||||
@ -412,7 +412,7 @@ bool zmq::ctx_t::start ()
|
||||
const int mazmq = _max_sockets;
|
||||
const int ios = _io_thread_count;
|
||||
_opt_sync.unlock ();
|
||||
int slot_count = mazmq + ios + term_and_reaper_threads_count;
|
||||
const int slot_count = mazmq + ios + term_and_reaper_threads_count;
|
||||
try {
|
||||
_slots.reserve (slot_count);
|
||||
_empty_slots.reserve (slot_count - term_and_reaper_threads_count);
|
||||
@ -498,11 +498,11 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
|
||||
}
|
||||
|
||||
// Choose a slot for the socket.
|
||||
uint32_t slot = _empty_slots.back ();
|
||||
const uint32_t slot = _empty_slots.back ();
|
||||
_empty_slots.pop_back ();
|
||||
|
||||
// Generate new unique socket ID.
|
||||
int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
|
||||
const int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
|
||||
|
||||
// Create the socket and register its mailbox.
|
||||
socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
|
||||
@ -521,7 +521,7 @@ void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
|
||||
scoped_lock_t locker (_slot_sync);
|
||||
|
||||
// Free the associated thread slot.
|
||||
uint32_t tid = socket_->get_tid ();
|
||||
const uint32_t tid = socket_->get_tid ();
|
||||
_empty_slots.push_back (tid);
|
||||
_slots[tid] = NULL;
|
||||
|
||||
@ -563,7 +563,7 @@ void zmq::thread_ctx_t::start_thread (thread_t &thread_,
|
||||
|
||||
int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_)
|
||||
{
|
||||
bool is_int = (optvallen_ == sizeof (int));
|
||||
const bool is_int = (optvallen_ == sizeof (int));
|
||||
int value = 0;
|
||||
if (is_int)
|
||||
memcpy (&value, optval_, sizeof (int));
|
||||
@ -701,7 +701,7 @@ int zmq::ctx_t::register_endpoint (const char *addr_,
|
||||
}
|
||||
|
||||
int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
|
||||
socket_base_t *socket_)
|
||||
const socket_base_t *const socket_)
|
||||
{
|
||||
scoped_lock_t locker (_endpoints_sync);
|
||||
|
||||
@ -717,7 +717,7 @@ int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
|
||||
return 0;
|
||||
}
|
||||
|
||||
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
|
||||
void zmq::ctx_t::unregister_endpoints (const socket_base_t *const socket_)
|
||||
{
|
||||
scoped_lock_t locker (_endpoints_sync);
|
||||
|
||||
@ -765,7 +765,7 @@ void zmq::ctx_t::pend_connection (const std::string &addr_,
|
||||
const pending_connection_t pending_connection = {endpoint_, pipes_[0],
|
||||
pipes_[1]};
|
||||
|
||||
endpoints_t::iterator it = _endpoints.find (addr_);
|
||||
const endpoints_t::iterator it = _endpoints.find (addr_);
|
||||
if (it == _endpoints.end ()) {
|
||||
// Still no bind.
|
||||
endpoint_.socket->inc_seqnum ();
|
||||
@ -783,7 +783,8 @@ void zmq::ctx_t::connect_pending (const char *addr_,
|
||||
{
|
||||
scoped_lock_t locker (_endpoints_sync);
|
||||
|
||||
std::pair<pending_connections_t::iterator, pending_connections_t::iterator>
|
||||
const std::pair<pending_connections_t::iterator,
|
||||
pending_connections_t::iterator>
|
||||
pending = _pending_connections.equal_range (addr_);
|
||||
for (pending_connections_t::iterator p = pending.first; p != pending.second;
|
||||
++p)
|
||||
@ -795,7 +796,7 @@ void zmq::ctx_t::connect_pending (const char *addr_,
|
||||
|
||||
void zmq::ctx_t::connect_inproc_sockets (
|
||||
zmq::socket_base_t *bind_socket_,
|
||||
options_t &bind_options_,
|
||||
const options_t &bind_options_,
|
||||
const pending_connection_t &pending_connection_,
|
||||
side side_)
|
||||
{
|
||||
|
@ -116,7 +116,7 @@ class ctx_t ZMQ_FINAL : public thread_ctx_t
|
||||
|
||||
// Set and get context properties.
|
||||
int set (int option_, const void *optval_, size_t optvallen_);
|
||||
int get (int option_, void *optval_, size_t *optvallen_);
|
||||
int get (int option_, void *optval_, const size_t *optvallen_);
|
||||
int get (int option_);
|
||||
|
||||
// Create and destroy a socket.
|
||||
@ -136,8 +136,9 @@ class ctx_t ZMQ_FINAL : public thread_ctx_t
|
||||
|
||||
// Management of inproc endpoints.
|
||||
int register_endpoint (const char *addr_, const endpoint_t &endpoint_);
|
||||
int unregister_endpoint (const std::string &addr_, socket_base_t *socket_);
|
||||
void unregister_endpoints (zmq::socket_base_t *socket_);
|
||||
int unregister_endpoint (const std::string &addr_,
|
||||
const socket_base_t *socket_);
|
||||
void unregister_endpoints (const zmq::socket_base_t *socket_);
|
||||
endpoint_t find_endpoint (const char *addr_);
|
||||
void pend_connection (const std::string &addr_,
|
||||
const endpoint_t &endpoint_,
|
||||
@ -254,7 +255,7 @@ class ctx_t ZMQ_FINAL : public thread_ctx_t
|
||||
};
|
||||
void
|
||||
connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
|
||||
options_t &bind_options_,
|
||||
const options_t &bind_options_,
|
||||
const pending_connection_t &pending_connection_,
|
||||
side side_);
|
||||
|
||||
|
@ -158,7 +158,7 @@ int zmq::curve_client_t::produce_hello (msg_t *msg_)
|
||||
int zmq::curve_client_t::process_welcome (const uint8_t *msg_data_,
|
||||
size_t msg_size_)
|
||||
{
|
||||
int rc = _tools.process_welcome (msg_data_, msg_size_, cn_precom);
|
||||
const int rc = _tools.process_welcome (msg_data_, msg_size_, cn_precom);
|
||||
|
||||
if (rc == -1) {
|
||||
session->get_socket ()->event_handshake_failed_protocol (
|
||||
|
@ -70,7 +70,7 @@ struct curve_client_tools_t
|
||||
put_uint64 (hello_nonce + 16, cn_nonce_);
|
||||
|
||||
// Create Box [64 * %x0](C'->S)
|
||||
int rc =
|
||||
const int rc =
|
||||
crypto_box (hello_box, &hello_plaintext[0], hello_plaintext.size (),
|
||||
hello_nonce, server_key_, cn_secret_);
|
||||
if (rc == -1)
|
||||
|
@ -125,7 +125,7 @@ int zmq::curve_mechanism_base_t::decode (msg_t *msg_)
|
||||
uint8_t message_nonce[crypto_box_NONCEBYTES];
|
||||
memcpy (message_nonce, decode_nonce_prefix, 16);
|
||||
memcpy (message_nonce + 16, message + 8, 8);
|
||||
uint64_t nonce = get_uint64 (message + 8);
|
||||
const uint64_t nonce = get_uint64 (message + 8);
|
||||
if (nonce <= cn_peer_nonce) {
|
||||
session->get_socket ()->event_handshake_failed_protocol (
|
||||
session->get_endpoint (), ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE);
|
||||
|
@ -76,7 +76,7 @@ int zmq::dealer_t::xsetsockopt (int option_,
|
||||
const void *optval_,
|
||||
size_t optvallen_)
|
||||
{
|
||||
bool is_int = (optvallen_ == sizeof (int));
|
||||
const bool is_int = (optvallen_ == sizeof (int));
|
||||
int value = 0;
|
||||
if (is_int)
|
||||
memcpy (&value, optval_, sizeof (int));
|
||||
|
@ -94,7 +94,7 @@ int zmq::dgram_t::xsend (msg_t *msg_)
|
||||
{
|
||||
// If there's no out pipe, just drop it.
|
||||
if (!_pipe) {
|
||||
int rc = msg_->close ();
|
||||
const int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
return -1;
|
||||
}
|
||||
@ -127,7 +127,7 @@ int zmq::dgram_t::xsend (msg_t *msg_)
|
||||
_more_out = !_more_out;
|
||||
|
||||
// Detach the message from the data buffer.
|
||||
int rc = msg_->init ();
|
||||
const int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
return 0;
|
||||
|
16
src/dish.cpp
16
src/dish.cpp
@ -44,13 +44,13 @@ zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
// subscription commands are sent to the wire.
|
||||
options.linger.store (0);
|
||||
|
||||
int rc = _message.init ();
|
||||
const int rc = _message.init ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
zmq::dish_t::~dish_t ()
|
||||
{
|
||||
int rc = _message.close ();
|
||||
const int rc = _message.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
@ -93,7 +93,7 @@ void zmq::dish_t::xhiccuped (pipe_t *pipe_)
|
||||
|
||||
int zmq::dish_t::xjoin (const char *group_)
|
||||
{
|
||||
std::string group = std::string (group_);
|
||||
const std::string group = std::string (group_);
|
||||
|
||||
if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
|
||||
errno = EINVAL;
|
||||
@ -117,7 +117,7 @@ int zmq::dish_t::xjoin (const char *group_)
|
||||
rc = _dist.send_to_all (&msg);
|
||||
if (rc != 0)
|
||||
err = errno;
|
||||
int rc2 = msg.close ();
|
||||
const int rc2 = msg.close ();
|
||||
errno_assert (rc2 == 0);
|
||||
if (rc != 0)
|
||||
errno = err;
|
||||
@ -126,7 +126,7 @@ int zmq::dish_t::xjoin (const char *group_)
|
||||
|
||||
int zmq::dish_t::xleave (const char *group_)
|
||||
{
|
||||
std::string group = std::string (group_);
|
||||
const std::string group = std::string (group_);
|
||||
|
||||
if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
|
||||
errno = EINVAL;
|
||||
@ -149,7 +149,7 @@ int zmq::dish_t::xleave (const char *group_)
|
||||
rc = _dist.send_to_all (&msg);
|
||||
if (rc != 0)
|
||||
err = errno;
|
||||
int rc2 = msg.close ();
|
||||
const int rc2 = msg.close ();
|
||||
errno_assert (rc2 == 0);
|
||||
if (rc != 0)
|
||||
errno = err;
|
||||
@ -269,7 +269,7 @@ int zmq::dish_session_t::push_msg (msg_t *msg_)
|
||||
_group_msg = *msg_;
|
||||
_state = body;
|
||||
|
||||
int rc = msg_->init ();
|
||||
const int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
return 0;
|
||||
}
|
||||
@ -312,7 +312,7 @@ int zmq::dish_session_t::pull_msg (msg_t *msg_)
|
||||
if (!msg_->is_join () && !msg_->is_leave ())
|
||||
return rc;
|
||||
|
||||
int group_length = static_cast<int> (strlen (msg_->group ()));
|
||||
const int group_length = static_cast<int> (strlen (msg_->group ()));
|
||||
|
||||
msg_t command;
|
||||
int offset;
|
||||
|
@ -81,7 +81,7 @@ void zmq::dist_t::match (pipe_t *pipe_)
|
||||
|
||||
void zmq::dist_t::reverse_match ()
|
||||
{
|
||||
pipes_t::size_type prev_matching = _matching;
|
||||
const pipes_t::size_type prev_matching = _matching;
|
||||
|
||||
// Reset matching to 0
|
||||
unmatch ();
|
||||
@ -145,7 +145,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_)
|
||||
int zmq::dist_t::send_to_matching (msg_t *msg_)
|
||||
{
|
||||
// Is this end of a multipart message?
|
||||
bool msg_more = (msg_->flags () & msg_t::more) != 0;
|
||||
const bool msg_more = (msg_->flags () & msg_t::more) != 0;
|
||||
|
||||
// Push the message to matching pipes.
|
||||
distribute (msg_);
|
||||
@ -204,7 +204,7 @@ void zmq::dist_t::distribute (msg_t *msg_)
|
||||
|
||||
// Detach the original message from the data buffer. Note that we don't
|
||||
// close the message. That's because we've already used all the references.
|
||||
int rc = msg_->init ();
|
||||
const int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
|
@ -74,7 +74,7 @@ template <typename T> class encoder_base_t : public i_encoder
|
||||
inline size_t encode (unsigned char **data_, size_t size_) ZMQ_FINAL
|
||||
{
|
||||
unsigned char *buffer = !*data_ ? _buf : *data_;
|
||||
size_t buffersize = !*data_ ? _buf_size : size_;
|
||||
const size_t buffersize = !*data_ ? _buf_size : size_;
|
||||
|
||||
if (in_progress () == NULL)
|
||||
return 0;
|
||||
@ -115,7 +115,7 @@ template <typename T> class encoder_base_t : public i_encoder
|
||||
}
|
||||
|
||||
// Copy data to the buffer. If the buffer is full, return.
|
||||
size_t to_copy = std::min (_to_write, buffersize - pos);
|
||||
const size_t to_copy = std::min (_to_write, buffersize - pos);
|
||||
memcpy (buffer + pos, _write_pos, to_copy);
|
||||
pos += to_copy;
|
||||
_write_pos += to_copy;
|
||||
|
@ -96,7 +96,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
|
||||
pe->ev.data.ptr = pe;
|
||||
pe->events = events_;
|
||||
|
||||
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev);
|
||||
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev);
|
||||
errno_assert (rc != -1);
|
||||
|
||||
// Increase the load metric of the thread.
|
||||
@ -109,7 +109,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)
|
||||
{
|
||||
check_thread ();
|
||||
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
|
||||
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
|
||||
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
|
||||
errno_assert (rc != -1);
|
||||
pe->fd = retired_fd;
|
||||
_retired.push_back (pe);
|
||||
@ -123,7 +123,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_)
|
||||
check_thread ();
|
||||
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
|
||||
pe->ev.events |= EPOLLIN;
|
||||
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
|
||||
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
|
||||
errno_assert (rc != -1);
|
||||
}
|
||||
|
||||
@ -132,7 +132,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_)
|
||||
check_thread ();
|
||||
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
|
||||
pe->ev.events &= ~(static_cast<short> (EPOLLIN));
|
||||
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
|
||||
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
|
||||
errno_assert (rc != -1);
|
||||
}
|
||||
|
||||
@ -141,7 +141,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_)
|
||||
check_thread ();
|
||||
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
|
||||
pe->ev.events |= EPOLLOUT;
|
||||
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
|
||||
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
|
||||
errno_assert (rc != -1);
|
||||
}
|
||||
|
||||
@ -150,7 +150,7 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
|
||||
check_thread ();
|
||||
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
|
||||
pe->ev.events &= ~(static_cast<short> (EPOLLOUT));
|
||||
int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
|
||||
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
|
||||
errno_assert (rc != -1);
|
||||
}
|
||||
|
||||
@ -170,7 +170,7 @@ void zmq::epoll_t::loop ()
|
||||
|
||||
while (true) {
|
||||
// Execute any due timers.
|
||||
int timeout = static_cast<int> (execute_timers ());
|
||||
const int timeout = static_cast<int> (execute_timers ());
|
||||
|
||||
if (get_load () == 0) {
|
||||
if (timeout == 0)
|
||||
@ -181,7 +181,7 @@ void zmq::epoll_t::loop ()
|
||||
}
|
||||
|
||||
// Wait for events.
|
||||
int n = epoll_wait (_epoll_fd, &ev_buf[0], max_io_events,
|
||||
const int n = epoll_wait (_epoll_fd, &ev_buf[0], max_io_events,
|
||||
timeout ? timeout : -1);
|
||||
if (n == -1) {
|
||||
errno_assert (errno == EINTR);
|
||||
|
@ -211,14 +211,14 @@ const char *zmq::wsa_error_no (int no_, const char *wsae_wouldblock_string_)
|
||||
|
||||
void zmq::win_error (char *buffer_, size_t buffer_size_)
|
||||
{
|
||||
DWORD errcode = GetLastError ();
|
||||
const DWORD errcode = GetLastError ();
|
||||
#if defined _WIN32_WCE
|
||||
DWORD rc = FormatMessageW (
|
||||
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, errcode,
|
||||
MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT), (LPWSTR) buffer_,
|
||||
buffer_size_ / sizeof (wchar_t), NULL);
|
||||
#else
|
||||
DWORD rc = FormatMessageA (
|
||||
const DWORD rc = FormatMessageA (
|
||||
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, errcode,
|
||||
MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT), buffer_,
|
||||
static_cast<DWORD> (buffer_size_), NULL);
|
||||
|
@ -90,7 +90,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
|
||||
while (_active > 0) {
|
||||
// Try to fetch new message. If we've already read part of the message
|
||||
// subsequent part should be immediately available.
|
||||
bool fetched = _pipes[_current]->read (msg_);
|
||||
const bool fetched = _pipes[_current]->read (msg_);
|
||||
|
||||
// Note that when message is not fetched, current pipe is deactivated
|
||||
// and replaced by another active pipe. Thus we don't have to increase
|
||||
|
29
src/ip.cpp
29
src/ip.cpp
@ -128,7 +128,7 @@ void zmq::unblock_socket (fd_t s_)
|
||||
{
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
u_long nonblock = 1;
|
||||
int rc = ioctlsocket (s_, FIONBIO, &nonblock);
|
||||
const int rc = ioctlsocket (s_, FIONBIO, &nonblock);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#elif defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
|
||||
int nonblock = 1;
|
||||
@ -154,7 +154,7 @@ void zmq::enable_ipv4_mapping (fd_t s_)
|
||||
#else
|
||||
int flag = 0;
|
||||
#endif
|
||||
int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY,
|
||||
const int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY,
|
||||
reinterpret_cast<char *> (&flag), sizeof (flag));
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
@ -168,7 +168,7 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
|
||||
{
|
||||
struct sockaddr_storage ss;
|
||||
|
||||
zmq_socklen_t addrlen =
|
||||
const zmq_socklen_t addrlen =
|
||||
get_socket_address (sockfd_, socket_end_remote, &ss);
|
||||
|
||||
if (addrlen == 0) {
|
||||
@ -186,8 +186,9 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
|
||||
}
|
||||
|
||||
char host[NI_MAXHOST];
|
||||
int rc = getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen,
|
||||
host, sizeof host, NULL, 0, NI_NUMERICHOST);
|
||||
const int rc =
|
||||
getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen, host,
|
||||
sizeof host, NULL, 0, NI_NUMERICHOST);
|
||||
if (rc != 0)
|
||||
return 0;
|
||||
|
||||
@ -297,9 +298,9 @@ bool zmq::initialize_network ()
|
||||
// Intialise Windows sockets. Note that WSAStartup can be called multiple
|
||||
// times given that WSACleanup will be called for each WSAStartup.
|
||||
|
||||
WORD version_requested = MAKEWORD (2, 2);
|
||||
const WORD version_requested = MAKEWORD (2, 2);
|
||||
WSADATA wsa_data;
|
||||
int rc = WSAStartup (version_requested, &wsa_data);
|
||||
const int rc = WSAStartup (version_requested, &wsa_data);
|
||||
zmq_assert (rc == 0);
|
||||
zmq_assert (LOBYTE (wsa_data.wVersion) == 2
|
||||
&& HIBYTE (wsa_data.wVersion) == 2);
|
||||
@ -312,7 +313,7 @@ void zmq::shutdown_network ()
|
||||
{
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
// On Windows, uninitialise socket layer.
|
||||
int rc = WSACleanup ();
|
||||
const int rc = WSACleanup ();
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#endif
|
||||
|
||||
@ -327,7 +328,7 @@ void zmq::shutdown_network ()
|
||||
static void tune_socket (const SOCKET socket_)
|
||||
{
|
||||
BOOL tcp_nodelay = 1;
|
||||
int rc =
|
||||
const int rc =
|
||||
setsockopt (socket_, IPPROTO_TCP, TCP_NODELAY,
|
||||
reinterpret_cast<char *> (&tcp_nodelay), sizeof tcp_nodelay);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
@ -363,7 +364,7 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
|
||||
// Create critical section only if using fixed signaler port
|
||||
// Use problematic Event implementation for compatibility if using old port 5905.
|
||||
// Otherwise use Mutex implementation.
|
||||
int event_signaler_port = 5905;
|
||||
const int event_signaler_port = 5905;
|
||||
|
||||
if (zmq::signaler_port == event_signaler_port) {
|
||||
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
|
||||
@ -431,7 +432,7 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
|
||||
|
||||
if (sync != NULL) {
|
||||
// Enter the critical section.
|
||||
DWORD dwrc = WaitForSingleObject (sync, INFINITE);
|
||||
const DWORD dwrc = WaitForSingleObject (sync, INFINITE);
|
||||
zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
|
||||
}
|
||||
|
||||
@ -468,7 +469,7 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
|
||||
// Send/receive large chunk to work around TCP slow start
|
||||
// This code is a workaround for #1608
|
||||
if (*r_ != INVALID_SOCKET) {
|
||||
size_t dummy_size =
|
||||
const size_t dummy_size =
|
||||
1024 * 1024; // 1M to overload default receive buffer
|
||||
unsigned char *dummy =
|
||||
static_cast<unsigned char *> (malloc (dummy_size));
|
||||
@ -561,7 +562,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
|
||||
std::string dirname, filename;
|
||||
|
||||
// Create a listening socket.
|
||||
SOCKET listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
|
||||
const SOCKET listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
|
||||
if (listener == retired_fd) {
|
||||
// This may happen if the library was built on a system supporting AF_UNIX, but the system running doesn't support it.
|
||||
goto try_tcpip;
|
||||
@ -801,7 +802,7 @@ void zmq::assert_success_or_recoverable (zmq::fd_t s_, int rc_)
|
||||
socklen_t len = sizeof err;
|
||||
#endif
|
||||
|
||||
int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
|
||||
const int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
|
||||
reinterpret_cast<char *> (&err), &len);
|
||||
|
||||
// Assert if the error was caused by 0MQ bug.
|
||||
|
@ -200,7 +200,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
|
||||
}
|
||||
|
||||
addr = std::string (name_, delim - name_);
|
||||
std::string port_str = std::string (delim + 1);
|
||||
const std::string port_str = std::string (delim + 1);
|
||||
|
||||
if (port_str == "*") {
|
||||
if (_options.bindable ()) {
|
||||
@ -229,7 +229,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
|
||||
|
||||
// Check if path is allowed in ip address, if allowed it must be truncated
|
||||
if (_options.allow_path ()) {
|
||||
size_t pos = addr.find ('/');
|
||||
const size_t pos = addr.find ('/');
|
||||
if (pos != std::string::npos)
|
||||
addr = addr.substr (0, pos);
|
||||
}
|
||||
@ -247,7 +247,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
|
||||
|
||||
// Look for an interface name / zone_id in the address
|
||||
// Reference: https://tools.ietf.org/html/rfc4007
|
||||
std::size_t pos = addr.rfind ('%');
|
||||
const std::size_t pos = addr.rfind ('%');
|
||||
uint32_t zone_id = 0;
|
||||
|
||||
if (pos != std::string::npos) {
|
||||
@ -277,7 +277,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
|
||||
|
||||
if (!resolved && _options.allow_nic_name ()) {
|
||||
// Try to resolve the string as a NIC name.
|
||||
int rc = resolve_nic_name (ip_addr_, addr_str);
|
||||
const int rc = resolve_nic_name (ip_addr_, addr_str);
|
||||
|
||||
if (rc == 0) {
|
||||
resolved = true;
|
||||
@ -287,7 +287,7 @@ int zmq::ip_resolver_t::resolve (ip_addr_t *ip_addr_, const char *name_)
|
||||
}
|
||||
|
||||
if (!resolved) {
|
||||
int rc = resolve_getaddrinfo (ip_addr_, addr_str);
|
||||
const int rc = resolve_getaddrinfo (ip_addr_, addr_str);
|
||||
|
||||
if (rc != 0) {
|
||||
return rc;
|
||||
@ -597,7 +597,7 @@ int zmq::ip_resolver_t::get_interface_name (unsigned long index_,
|
||||
int zmq::ip_resolver_t::wchar_to_utf8 (const WCHAR *src_, char **dest_) const
|
||||
{
|
||||
int rc;
|
||||
int buffer_len =
|
||||
const int buffer_len =
|
||||
WideCharToMultiByte (CP_UTF8, 0, src_, -1, NULL, 0, NULL, 0);
|
||||
|
||||
char *buffer = static_cast<char *> (malloc (buffer_len));
|
||||
|
@ -65,7 +65,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
|
||||
|
||||
void zmq::ipc_connecter_t::out_event ()
|
||||
{
|
||||
fd_t fd = connect ();
|
||||
const fd_t fd = connect ();
|
||||
rm_handle ();
|
||||
|
||||
// Handle the error condition by attempt to reconnect.
|
||||
@ -81,7 +81,7 @@ void zmq::ipc_connecter_t::out_event ()
|
||||
void zmq::ipc_connecter_t::start_connecting ()
|
||||
{
|
||||
// Open the connecting socket.
|
||||
int rc = open ();
|
||||
const int rc = open ();
|
||||
|
||||
// Connect may succeed in synchronous manner.
|
||||
if (rc == 0) {
|
||||
@ -122,7 +122,7 @@ int zmq::ipc_connecter_t::open ()
|
||||
unblock_socket (_s);
|
||||
|
||||
// Connect to the remote peer.
|
||||
int rc = ::connect (_s, _addr->resolved.ipc_addr->addr (),
|
||||
const int rc = ::connect (_s, _addr->resolved.ipc_addr->addr (),
|
||||
_addr->resolved.ipc_addr->addrlen ());
|
||||
|
||||
// Connect was successful immediately.
|
||||
@ -153,7 +153,7 @@ zmq::fd_t zmq::ipc_connecter_t::connect ()
|
||||
// implementations and Solaris.
|
||||
int err = 0;
|
||||
zmq_socklen_t len = static_cast<zmq_socklen_t> (sizeof (err));
|
||||
int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
|
||||
const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
|
||||
reinterpret_cast<char *> (&err), &len);
|
||||
if (rc == -1) {
|
||||
if (errno == ENOPROTOOPT)
|
||||
@ -171,7 +171,7 @@ zmq::fd_t zmq::ipc_connecter_t::connect ()
|
||||
return retired_fd;
|
||||
}
|
||||
|
||||
fd_t result = _s;
|
||||
const fd_t result = _s;
|
||||
_s = retired_fd;
|
||||
return result;
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
|
||||
|
||||
void zmq::ipc_listener_t::in_event ()
|
||||
{
|
||||
fd_t fd = accept ();
|
||||
const fd_t fd = accept ();
|
||||
|
||||
// If connection was reset by the peer in the meantime, just ignore it.
|
||||
// TODO: Handle specific errors like ENFILE/EMFILE etc.
|
||||
@ -134,7 +134,7 @@ int zmq::ipc_listener_t::set_local_address (const char *addr_)
|
||||
if (rc != 0) {
|
||||
if (!_tmp_socket_dirname.empty ()) {
|
||||
// We need to preserve errno to return to the user
|
||||
int tmp_errno = errno;
|
||||
const int tmp_errno = errno;
|
||||
::rmdir (_tmp_socket_dirname.c_str ());
|
||||
_tmp_socket_dirname.clear ();
|
||||
errno = tmp_errno;
|
||||
@ -152,7 +152,7 @@ int zmq::ipc_listener_t::set_local_address (const char *addr_)
|
||||
if (_s == retired_fd) {
|
||||
if (!_tmp_socket_dirname.empty ()) {
|
||||
// We need to preserve errno to return to the user
|
||||
int tmp_errno = errno;
|
||||
const int tmp_errno = errno;
|
||||
::rmdir (_tmp_socket_dirname.c_str ());
|
||||
_tmp_socket_dirname.clear ();
|
||||
errno = tmp_errno;
|
||||
@ -180,7 +180,7 @@ int zmq::ipc_listener_t::set_local_address (const char *addr_)
|
||||
return 0;
|
||||
|
||||
error:
|
||||
int err = errno;
|
||||
const int err = errno;
|
||||
close ();
|
||||
errno = err;
|
||||
return -1;
|
||||
@ -189,7 +189,7 @@ error:
|
||||
int zmq::ipc_listener_t::close ()
|
||||
{
|
||||
zmq_assert (_s != retired_fd);
|
||||
fd_t fd_for_event = _s;
|
||||
const fd_t fd_for_event = _s;
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
int rc = closesocket (_s);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
@ -313,7 +313,7 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
|
||||
socklen_t ss_len = sizeof (ss);
|
||||
#endif
|
||||
|
||||
fd_t sock =
|
||||
const fd_t sock =
|
||||
::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
|
||||
#endif
|
||||
if (sock == retired_fd) {
|
||||
@ -342,7 +342,7 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
|
||||
|
||||
if (zmq::set_nosigpipe (sock)) {
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
int rc = closesocket (sock);
|
||||
const int rc = closesocket (sock);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#else
|
||||
int rc = ::close (sock);
|
||||
|
@ -50,7 +50,7 @@ void zmq::lb_t::attach (pipe_t *pipe_)
|
||||
|
||||
void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
|
||||
{
|
||||
pipes_t::size_type index = _pipes.index (pipe_);
|
||||
const pipes_t::size_type index = _pipes.index (pipe_);
|
||||
|
||||
// If we are in the middle of multipart message and current pipe
|
||||
// have disconnected, we have to drop the remainder of the message.
|
||||
@ -151,7 +151,7 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
|
||||
}
|
||||
|
||||
// Detach the message from the data buffer.
|
||||
int rc = msg_->init ();
|
||||
const int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
return 0;
|
||||
|
@ -62,7 +62,7 @@ void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
|
||||
{
|
||||
// TODO: make a copy of array and signal outside the lock
|
||||
const std::vector<zmq::signaler_t *>::iterator end = _signalers.end ();
|
||||
std::vector<signaler_t *>::iterator it =
|
||||
const std::vector<signaler_t *>::iterator it =
|
||||
std::find (_signalers.begin (), end, signaler_);
|
||||
|
||||
if (it != end)
|
||||
@ -106,7 +106,7 @@ int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
|
||||
_sync->lock ();
|
||||
} else {
|
||||
// Wait for signal from the command sender.
|
||||
int rc = _cond_var.wait (_sync, timeout_);
|
||||
const int rc = _cond_var.wait (_sync, timeout_);
|
||||
if (rc == -1) {
|
||||
errno_assert (errno == EAGAIN || errno == EINTR);
|
||||
return -1;
|
||||
|
@ -36,7 +36,7 @@ zmq::metadata_t::metadata_t (const dict_t &dict_) : _ref_cnt (1), _dict (dict_)
|
||||
|
||||
const char *zmq::metadata_t::get (const std::string &property_) const
|
||||
{
|
||||
dict_t::const_iterator it = _dict.find (property_);
|
||||
const dict_t::const_iterator it = _dict.find (property_);
|
||||
if (it == _dict.end ()) {
|
||||
/** \todo remove this when support for the deprecated name "Identity" is dropped */
|
||||
if (property_ == "Identity")
|
||||
|
@ -294,7 +294,7 @@ int zmq::msg_t::copy (msg_t &src_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
int rc = close ();
|
||||
const int rc = close ();
|
||||
if (unlikely (rc < 0))
|
||||
return rc;
|
||||
|
||||
|
@ -289,7 +289,7 @@ class msg_t
|
||||
inline int close_and_return (zmq::msg_t *msg_, int echo_)
|
||||
{
|
||||
// Since we abort on close failure we preserve errno for success case.
|
||||
int err = errno;
|
||||
const int err = errno;
|
||||
const int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
errno = err;
|
||||
|
@ -68,7 +68,7 @@ zmq::ctx_t *zmq::object_t::get_ctx ()
|
||||
return _ctx;
|
||||
}
|
||||
|
||||
void zmq::object_t::process_command (command_t &cmd_)
|
||||
void zmq::object_t::process_command (const command_t &cmd_)
|
||||
{
|
||||
switch (cmd_.type) {
|
||||
case command_t::activate_read:
|
||||
@ -528,7 +528,7 @@ void zmq::object_t::process_seqnum ()
|
||||
zmq_assert (false);
|
||||
}
|
||||
|
||||
void zmq::object_t::send_command (command_t &cmd_)
|
||||
void zmq::object_t::send_command (const command_t &cmd_)
|
||||
{
|
||||
_ctx->send_command (cmd_.destination->get_tid (), cmd_);
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ class object_t
|
||||
uint32_t get_tid ();
|
||||
void set_tid (uint32_t id_);
|
||||
ctx_t *get_ctx ();
|
||||
void process_command (zmq::command_t &cmd_);
|
||||
void process_command (const zmq::command_t &cmd_);
|
||||
void send_inproc_connected (zmq::socket_base_t *socket_);
|
||||
void send_bind (zmq::own_t *destination_,
|
||||
zmq::pipe_t *pipe_,
|
||||
@ -157,7 +157,7 @@ class object_t
|
||||
// Thread ID of the thread the object belongs to.
|
||||
uint32_t _tid;
|
||||
|
||||
void send_command (command_t &cmd_);
|
||||
void send_command (const command_t &cmd_);
|
||||
|
||||
ZMQ_NON_COPYABLE_NOR_MOVABLE (object_t)
|
||||
};
|
||||
|
@ -302,7 +302,7 @@ int zmq::options_t::setsockopt (int option_,
|
||||
const void *optval_,
|
||||
size_t optvallen_)
|
||||
{
|
||||
bool is_int = (optvallen_ == sizeof (int));
|
||||
const bool is_int = (optvallen_ == sizeof (int));
|
||||
int value = 0;
|
||||
if (is_int)
|
||||
memcpy (&value, optval_, sizeof (int));
|
||||
@ -448,7 +448,7 @@ int zmq::options_t::setsockopt (int option_,
|
||||
/* Deprecated in favor of ZMQ_IPV6 */
|
||||
case ZMQ_IPV4ONLY: {
|
||||
bool value;
|
||||
int rc =
|
||||
const int rc =
|
||||
do_setsockopt_int_as_bool_strict (optval_, optvallen_, &value);
|
||||
if (rc == 0)
|
||||
ipv6 = !value;
|
||||
|
@ -97,7 +97,7 @@ int zmq::pair_t::xsend (msg_t *msg_)
|
||||
_pipe->flush ();
|
||||
|
||||
// Detach the original message from the data buffer.
|
||||
int rc = msg_->init ();
|
||||
const int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
return 0;
|
||||
|
10
src/pipe.cpp
10
src/pipe.cpp
@ -38,10 +38,10 @@
|
||||
#include "ypipe.hpp"
|
||||
#include "ypipe_conflate.hpp"
|
||||
|
||||
int zmq::pipepair (class object_t *parents_[2],
|
||||
class pipe_t *pipes_[2],
|
||||
int hwms_[2],
|
||||
bool conflate_[2])
|
||||
int zmq::pipepair (object_t *parents_[2],
|
||||
pipe_t *pipes_[2],
|
||||
const int hwms_[2],
|
||||
const bool conflate_[2])
|
||||
{
|
||||
// Creates two pipe objects. These objects are connected by two ypipes,
|
||||
// each to pass messages in one direction.
|
||||
@ -233,7 +233,7 @@ bool zmq::pipe_t::check_write ()
|
||||
return true;
|
||||
}
|
||||
|
||||
bool zmq::pipe_t::write (msg_t *msg_)
|
||||
bool zmq::pipe_t::write (const msg_t *msg_)
|
||||
{
|
||||
if (unlikely (!check_write ()))
|
||||
return false;
|
||||
|
10
src/pipe.hpp
10
src/pipe.hpp
@ -54,8 +54,8 @@ class pipe_t;
|
||||
// read (older messages are discarded)
|
||||
int pipepair (zmq::object_t *parents_[2],
|
||||
zmq::pipe_t *pipes_[2],
|
||||
int hwms_[2],
|
||||
bool conflate_[2]);
|
||||
const int hwms_[2],
|
||||
const bool conflate_[2]);
|
||||
|
||||
struct i_pipe_events
|
||||
{
|
||||
@ -79,8 +79,8 @@ class pipe_t ZMQ_FINAL : public object_t,
|
||||
// This allows pipepair to create pipe objects.
|
||||
friend int pipepair (zmq::object_t *parents_[2],
|
||||
zmq::pipe_t *pipes_[2],
|
||||
int hwms_[2],
|
||||
bool conflate_[2]);
|
||||
const int hwms_[2],
|
||||
const bool conflate_[2]);
|
||||
|
||||
public:
|
||||
// Specifies the object to send events to.
|
||||
@ -108,7 +108,7 @@ class pipe_t ZMQ_FINAL : public object_t,
|
||||
// Writes a message to the underlying pipe. Returns false if the
|
||||
// message does not pass check_write. If false, the message object
|
||||
// retains ownership of its message buffer.
|
||||
bool write (msg_t *msg_);
|
||||
bool write (const msg_t *msg_);
|
||||
|
||||
// Remove unfinished parts of the outbound message from the pipe.
|
||||
void rollback () const;
|
||||
|
@ -183,9 +183,9 @@ static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int reply_stats (class zmq::socket_base_t *control_,
|
||||
zmq_socket_stats_t *frontend_stats_,
|
||||
zmq_socket_stats_t *backend_stats_)
|
||||
static int reply_stats (zmq::socket_base_t *control_,
|
||||
const zmq_socket_stats_t *frontend_stats_,
|
||||
const zmq_socket_stats_t *backend_stats_)
|
||||
{
|
||||
// first part: frontend stats - the first send might fail due to HWM
|
||||
if (loop_and_send_multipart_stat (control_, frontend_stats_->msg_in, true,
|
||||
|
@ -157,8 +157,8 @@ int zmq::radio_t::xsend (msg_t *msg_)
|
||||
|
||||
_dist.unmatch ();
|
||||
|
||||
std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
|
||||
_subscriptions.equal_range (std::string (msg_->group ()));
|
||||
const std::pair<subscriptions_t::iterator, subscriptions_t::iterator>
|
||||
range = _subscriptions.equal_range (std::string (msg_->group ()));
|
||||
|
||||
for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
|
||||
_dist.match (it->second);
|
||||
@ -262,7 +262,7 @@ int zmq::radio_session_t::pull_msg (msg_t *msg_)
|
||||
return rc;
|
||||
|
||||
const char *group = _pending_msg.group ();
|
||||
int length = static_cast<int> (strlen (group));
|
||||
const int length = static_cast<int> (strlen (group));
|
||||
|
||||
// First frame is the group
|
||||
rc = msg_->init_size (length);
|
||||
|
@ -155,7 +155,7 @@ bool node_t::operator!= (node_t other_) const
|
||||
|
||||
void node_t::resize (size_t prefix_length_, size_t edgecount_)
|
||||
{
|
||||
size_t node_size = 3 * sizeof (uint32_t) + prefix_length_
|
||||
const size_t node_size = 3 * sizeof (uint32_t) + prefix_length_
|
||||
+ edgecount_ * (1 + sizeof (void *));
|
||||
unsigned char *new_data =
|
||||
static_cast<unsigned char *> (realloc (_data, node_size));
|
||||
@ -167,7 +167,7 @@ void node_t::resize (size_t prefix_length_, size_t edgecount_)
|
||||
|
||||
node_t make_node (size_t refcount_, size_t prefix_length_, size_t edgecount_)
|
||||
{
|
||||
size_t node_size = 3 * sizeof (uint32_t) + prefix_length_
|
||||
const size_t node_size = 3 * sizeof (uint32_t) + prefix_length_
|
||||
+ edgecount_ * (1 + sizeof (void *));
|
||||
|
||||
unsigned char *data = static_cast<unsigned char *> (malloc (node_size));
|
||||
@ -285,10 +285,10 @@ match_result_t zmq::radix_tree_t::match (const unsigned char *key_,
|
||||
|
||||
bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
|
||||
{
|
||||
match_result_t match_result = match (key_, key_size_);
|
||||
size_t key_bytes_matched = match_result._key_bytes_matched;
|
||||
size_t prefix_bytes_matched = match_result._prefix_bytes_matched;
|
||||
size_t edge_index = match_result._edge_index;
|
||||
const match_result_t match_result = match (key_, key_size_);
|
||||
const size_t key_bytes_matched = match_result._key_bytes_matched;
|
||||
const size_t prefix_bytes_matched = match_result._prefix_bytes_matched;
|
||||
const size_t edge_index = match_result._edge_index;
|
||||
node_t current_node = match_result._current_node;
|
||||
node_t parent_node = match_result._parent_node;
|
||||
|
||||
@ -409,11 +409,11 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
|
||||
|
||||
bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
|
||||
{
|
||||
match_result_t match_result = match (key_, key_size_);
|
||||
size_t key_bytes_matched = match_result._key_bytes_matched;
|
||||
size_t prefix_bytes_matched = match_result._prefix_bytes_matched;
|
||||
size_t edge_index = match_result._edge_index;
|
||||
size_t parent_edge_index = match_result._parent_edge_index;
|
||||
const match_result_t match_result = match (key_, key_size_);
|
||||
const size_t key_bytes_matched = match_result._key_bytes_matched;
|
||||
const size_t prefix_bytes_matched = match_result._prefix_bytes_matched;
|
||||
const size_t edge_index = match_result._edge_index;
|
||||
const size_t parent_edge_index = match_result._parent_edge_index;
|
||||
node_t current_node = match_result._current_node;
|
||||
node_t parent_node = match_result._parent_node;
|
||||
node_t grandparent_node = match_result._grandparent_node;
|
||||
@ -432,7 +432,7 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
|
||||
if (current_node == _root)
|
||||
return true;
|
||||
|
||||
size_t outgoing_edges = current_node.edgecount ();
|
||||
const size_t outgoing_edges = current_node.edgecount ();
|
||||
if (outgoing_edges > 1)
|
||||
// This node can't be merged with any other node, so there's
|
||||
// nothing more to do.
|
||||
@ -445,7 +445,7 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
|
||||
// Make room for the child node's prefix and edges. We need to
|
||||
// keep the old prefix length since resize() will overwrite
|
||||
// it.
|
||||
uint32_t old_prefix_length = current_node.prefix_length ();
|
||||
const uint32_t old_prefix_length = current_node.prefix_length ();
|
||||
current_node.resize (old_prefix_length + child.prefix_length (),
|
||||
child.edgecount ());
|
||||
|
||||
@ -474,7 +474,7 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
|
||||
// Make room for the child node's prefix and edges. We need to
|
||||
// keep the old prefix length since resize() will overwrite
|
||||
// it.
|
||||
uint32_t old_prefix_length = parent_node.prefix_length ();
|
||||
const uint32_t old_prefix_length = parent_node.prefix_length ();
|
||||
parent_node.resize (old_prefix_length + other_child.prefix_length (),
|
||||
other_child.edgecount ());
|
||||
|
||||
@ -501,9 +501,9 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
|
||||
// Replace the edge to the current node with the last edge. An
|
||||
// edge consists of a byte and a pointer to the next node. First
|
||||
// replace the byte.
|
||||
size_t last_index = parent_node.edgecount () - 1;
|
||||
unsigned char last_byte = parent_node.first_byte_at (last_index);
|
||||
node_t last_node = parent_node.node_at (last_index);
|
||||
const size_t last_index = parent_node.edgecount () - 1;
|
||||
const unsigned char last_byte = parent_node.first_byte_at (last_index);
|
||||
const node_t last_node = parent_node.node_at (last_index);
|
||||
parent_node.set_edge_at (edge_index, last_byte, last_node);
|
||||
|
||||
// Move the chunk of pointers one byte to the left, effectively
|
||||
|
@ -49,7 +49,7 @@
|
||||
void zmq::seed_random ()
|
||||
{
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
int pid = static_cast<int> (GetCurrentProcessId ());
|
||||
const int pid = static_cast<int> (GetCurrentProcessId ());
|
||||
#else
|
||||
int pid = static_cast<int> (getpid ());
|
||||
#endif
|
||||
@ -59,7 +59,7 @@ void zmq::seed_random ()
|
||||
uint32_t zmq::generate_random ()
|
||||
{
|
||||
// Compensate for the fact that rand() returns signed integer.
|
||||
uint32_t low = static_cast<uint32_t> (rand ());
|
||||
const uint32_t low = static_cast<uint32_t> (rand ());
|
||||
uint32_t high = static_cast<uint32_t> (rand ());
|
||||
high <<= (sizeof (int) * 8 - 1);
|
||||
return high | low;
|
||||
|
@ -93,7 +93,7 @@ void zmq::reaper_t::in_event ()
|
||||
|
||||
// Get the next command. If there is none, exit.
|
||||
command_t cmd;
|
||||
int rc = _mailbox.recv (&cmd, 0);
|
||||
const int rc = _mailbox.recv (&cmd, 0);
|
||||
if (rc != 0 && errno == EINTR)
|
||||
continue;
|
||||
if (rc != 0 && errno == EAGAIN)
|
||||
|
@ -52,10 +52,10 @@ int zmq::rep_t::xsend (msg_t *msg_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
bool more = (msg_->flags () & msg_t::more) != 0;
|
||||
const bool more = (msg_->flags () & msg_t::more) != 0;
|
||||
|
||||
// Push message to the reply pipe.
|
||||
int rc = router_t::xsend (msg_);
|
||||
const int rc = router_t::xsend (msg_);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
@ -84,7 +84,7 @@ int zmq::rep_t::xrecv (msg_t *msg_)
|
||||
|
||||
if ((msg_->flags () & msg_t::more)) {
|
||||
// Empty message part delimits the traceback stack.
|
||||
bool bottom = (msg_->size () == 0);
|
||||
const bool bottom = (msg_->size () == 0);
|
||||
|
||||
// Push it to the reply pipe.
|
||||
rc = router_t::xsend (msg_);
|
||||
@ -103,7 +103,7 @@ int zmq::rep_t::xrecv (msg_t *msg_)
|
||||
}
|
||||
|
||||
// Get next message part to return to the user.
|
||||
int rc = router_t::xrecv (msg_);
|
||||
const int rc = router_t::xrecv (msg_);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
|
@ -175,7 +175,7 @@ int zmq::req_t::xrecv (msg_t *msg_)
|
||||
_message_begins = false;
|
||||
}
|
||||
|
||||
int rc = recv_reply_pipe (msg_);
|
||||
const int rc = recv_reply_pipe (msg_);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
@ -210,7 +210,7 @@ int zmq::req_t::xsetsockopt (int option_,
|
||||
const void *optval_,
|
||||
size_t optvallen_)
|
||||
{
|
||||
bool is_int = (optvallen_ == sizeof (int));
|
||||
const bool is_int = (optvallen_ == sizeof (int));
|
||||
int value = 0;
|
||||
if (is_int)
|
||||
memcpy (&value, optval_, sizeof (int));
|
||||
@ -248,7 +248,7 @@ int zmq::req_t::recv_reply_pipe (msg_t *msg_)
|
||||
{
|
||||
while (true) {
|
||||
pipe_t *pipe = NULL;
|
||||
int rc = dealer_t::recvpipe (msg_, &pipe);
|
||||
const int rc = dealer_t::recvpipe (msg_, &pipe);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
if (!_reply_pipe || pipe == _reply_pipe)
|
||||
|
@ -90,7 +90,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_,
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
|
||||
const bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
|
||||
if (routing_id_ok)
|
||||
_fq.attach (pipe_);
|
||||
else
|
||||
@ -171,7 +171,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
|
||||
|
||||
void zmq::router_t::xread_activated (pipe_t *pipe_)
|
||||
{
|
||||
std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_);
|
||||
const std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_);
|
||||
if (it == _anonymous_pipes.end ())
|
||||
_fq.activated (pipe_);
|
||||
else {
|
||||
@ -209,7 +209,7 @@ int zmq::router_t::xsend (msg_t *msg_)
|
||||
// Check whether pipe is closed or not
|
||||
if (!_current_out->check_write ()) {
|
||||
// Check whether pipe is full or not
|
||||
bool pipe_full = !_current_out->check_hwm ();
|
||||
const bool pipe_full = !_current_out->check_hwm ();
|
||||
out_pipe->active = false;
|
||||
_current_out = NULL;
|
||||
|
||||
@ -258,10 +258,10 @@ int zmq::router_t::xsend (msg_t *msg_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool ok = _current_out->write (msg_);
|
||||
const bool ok = _current_out->write (msg_);
|
||||
if (unlikely (!ok)) {
|
||||
// Message failed to send - we must close it ourselves.
|
||||
int rc = msg_->close ();
|
||||
const int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
// HWM was checked before, so the pipe must be gone. Roll back
|
||||
// messages that were piped, for example REP labels.
|
||||
@ -274,12 +274,12 @@ int zmq::router_t::xsend (msg_t *msg_)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int rc = msg_->close ();
|
||||
const int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
// Detach the message from the data buffer.
|
||||
int rc = msg_->init ();
|
||||
const int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
return 0;
|
||||
@ -289,11 +289,11 @@ int zmq::router_t::xrecv (msg_t *msg_)
|
||||
{
|
||||
if (_prefetched) {
|
||||
if (!_routing_id_sent) {
|
||||
int rc = msg_->move (_prefetched_id);
|
||||
const int rc = msg_->move (_prefetched_id);
|
||||
errno_assert (rc == 0);
|
||||
_routing_id_sent = true;
|
||||
} else {
|
||||
int rc = msg_->move (_prefetched_msg);
|
||||
const int rc = msg_->move (_prefetched_msg);
|
||||
errno_assert (rc == 0);
|
||||
_prefetched = false;
|
||||
}
|
||||
@ -469,7 +469,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
|
||||
} else if (!options.raw_socket) {
|
||||
// Pick up handshake cases and also case where next integral routing id is set
|
||||
msg.init ();
|
||||
bool ok = pipe_->read (&msg);
|
||||
const bool ok = pipe_->read (&msg);
|
||||
if (!ok)
|
||||
return false;
|
||||
|
||||
|
@ -64,7 +64,8 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_,
|
||||
pipe_->set_server_socket_routing_id (routing_id);
|
||||
// Add the record into output pipes lookup table
|
||||
outpipe_t outpipe = {pipe_, true};
|
||||
bool ok = _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (routing_id, outpipe).second;
|
||||
const bool ok =
|
||||
_out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (routing_id, outpipe).second;
|
||||
zmq_assert (ok);
|
||||
|
||||
_fq.attach (pipe_);
|
||||
@ -72,7 +73,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_,
|
||||
|
||||
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
|
||||
{
|
||||
out_pipes_t::iterator it =
|
||||
const out_pipes_t::iterator it =
|
||||
_out_pipes.find (pipe_->get_server_socket_routing_id ());
|
||||
zmq_assert (it != _out_pipes.end ());
|
||||
_out_pipes.erase (it);
|
||||
@ -105,7 +106,7 @@ int zmq::server_t::xsend (msg_t *msg_)
|
||||
return -1;
|
||||
}
|
||||
// Find the pipe associated with the routing stored in the message.
|
||||
uint32_t routing_id = msg_->get_routing_id ();
|
||||
const uint32_t routing_id = msg_->get_routing_id ();
|
||||
out_pipes_t::iterator it = _out_pipes.find (routing_id);
|
||||
|
||||
if (it != _out_pipes.end ()) {
|
||||
@ -123,7 +124,7 @@ int zmq::server_t::xsend (msg_t *msg_)
|
||||
int rc = msg_->reset_routing_id ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
bool ok = it->second.pipe->write (msg_);
|
||||
const bool ok = it->second.pipe->write (msg_);
|
||||
if (unlikely (!ok)) {
|
||||
// Message failed to send - we must close it ourselves.
|
||||
rc = msg_->close ();
|
||||
@ -161,7 +162,7 @@ int zmq::server_t::xrecv (msg_t *msg_)
|
||||
|
||||
zmq_assert (pipe != NULL);
|
||||
|
||||
uint32_t routing_id = pipe->get_server_socket_routing_id ();
|
||||
const uint32_t routing_id = pipe->get_server_socket_routing_id ();
|
||||
msg_->set_routing_id (routing_id);
|
||||
|
||||
return 0;
|
||||
|
@ -173,7 +173,7 @@ int zmq::session_base_t::push_msg (msg_t *msg_)
|
||||
&& !msg_->is_cancel ())
|
||||
return 0;
|
||||
if (_pipe && _pipe->write (msg_)) {
|
||||
int rc = msg_->init ();
|
||||
const int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
return 0;
|
||||
}
|
||||
@ -404,7 +404,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
|
||||
int hwms[2] = {conflate ? -1 : options.rcvhwm,
|
||||
conflate ? -1 : options.sndhwm};
|
||||
bool conflates[2] = {conflate, conflate};
|
||||
int rc = pipepair (parents, pipes, hwms, conflates);
|
||||
const int rc = pipepair (parents, pipes, hwms, conflates);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Plug the local end of the pipe.
|
||||
|
@ -123,7 +123,7 @@ int zmq::socket_base_t::inprocs_t::erase_pipes (
|
||||
return 0;
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::inprocs_t::erase_pipe (pipe_t *pipe_)
|
||||
void zmq::socket_base_t::inprocs_t::erase_pipe (const pipe_t *pipe_)
|
||||
{
|
||||
for (map_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
|
||||
it != end; ++it)
|
||||
@ -311,7 +311,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
|
||||
{
|
||||
zmq_assert (uri_ != NULL);
|
||||
|
||||
std::string uri (uri_);
|
||||
const std::string uri (uri_);
|
||||
const std::string::size_type pos = uri.find ("://");
|
||||
if (pos == std::string::npos) {
|
||||
errno = EINVAL;
|
||||
@ -1689,7 +1689,7 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
|
||||
unregister_term_ack ();
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::extract_flags (msg_t *msg_)
|
||||
void zmq::socket_base_t::extract_flags (const msg_t *msg_)
|
||||
{
|
||||
// Test whether routing_id flag is valid for this socket type.
|
||||
if (unlikely (msg_->flags () & msg_t::routing_id))
|
||||
@ -1887,7 +1887,7 @@ void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
|
||||
// Send a monitor event
|
||||
void zmq::socket_base_t::monitor_event (
|
||||
uint64_t event_,
|
||||
uint64_t values_[],
|
||||
const uint64_t values_[],
|
||||
uint64_t values_count_,
|
||||
const endpoint_uri_pair_t &endpoint_uri_pair_) const
|
||||
{
|
||||
@ -2067,7 +2067,7 @@ zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_) const
|
||||
return it == _out_pipes.end () ? NULL : &it->second;
|
||||
}
|
||||
|
||||
void zmq::routing_socket_base_t::erase_out_pipe (pipe_t *pipe_)
|
||||
void zmq::routing_socket_base_t::erase_out_pipe (const pipe_t *pipe_)
|
||||
{
|
||||
const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
|
||||
zmq_assert (erased);
|
||||
|
@ -214,7 +214,7 @@ class socket_base_t : public own_t,
|
||||
|
||||
// Socket event data dispatch
|
||||
void monitor_event (uint64_t event_,
|
||||
uint64_t values_[],
|
||||
const uint64_t values_[],
|
||||
uint64_t values_count_,
|
||||
const endpoint_uri_pair_t &endpoint_uri_pair_) const;
|
||||
|
||||
@ -237,7 +237,7 @@ class socket_base_t : public own_t,
|
||||
public:
|
||||
void emplace (const char *endpoint_uri_, pipe_t *pipe_);
|
||||
int erase_pipes (const std::string &endpoint_uri_str_);
|
||||
void erase_pipe (pipe_t *pipe_);
|
||||
void erase_pipe (const pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
typedef std::multimap<std::string, pipe_t *> map_t;
|
||||
@ -251,7 +251,7 @@ class socket_base_t : public own_t,
|
||||
|
||||
// Moves the flags from the message to local variables,
|
||||
// to be later retrieved by getsockopt.
|
||||
void extract_flags (msg_t *msg_);
|
||||
void extract_flags (const msg_t *msg_);
|
||||
|
||||
// Used to check whether the object is a socket.
|
||||
uint32_t _tag;
|
||||
@ -371,7 +371,7 @@ class routing_socket_base_t : public socket_base_t
|
||||
bool has_out_pipe (const blob_t &routing_id_) const;
|
||||
out_pipe_t *lookup_out_pipe (const blob_t &routing_id_);
|
||||
const out_pipe_t *lookup_out_pipe (const blob_t &routing_id_) const;
|
||||
void erase_out_pipe (pipe_t *pipe_);
|
||||
void erase_out_pipe (const pipe_t *pipe_);
|
||||
out_pipe_t try_erase_out_pipe (const blob_t &routing_id_);
|
||||
template <typename Func> bool any_of_out_pipes (Func func_)
|
||||
{
|
||||
|
@ -35,7 +35,7 @@
|
||||
|
||||
#include <limits.h>
|
||||
|
||||
static bool is_thread_safe (zmq::socket_base_t &socket_)
|
||||
static bool is_thread_safe (const zmq::socket_base_t &socket_)
|
||||
{
|
||||
// do not use getsockopt here, since that would fail during context termination
|
||||
return socket_.is_thread_safe ();
|
||||
@ -127,7 +127,7 @@ int zmq::socket_poller_t::add (socket_base_t *socket_,
|
||||
socket_->add_signaler (_signaler);
|
||||
}
|
||||
|
||||
item_t item = {
|
||||
const item_t item = {
|
||||
socket_,
|
||||
0,
|
||||
user_data_,
|
||||
@ -159,7 +159,7 @@ int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
|
||||
}
|
||||
}
|
||||
|
||||
item_t item = {
|
||||
const item_t item = {
|
||||
NULL,
|
||||
fd_,
|
||||
user_data_,
|
||||
@ -181,7 +181,7 @@ int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
|
||||
int zmq::socket_poller_t::modify (const socket_base_t *socket_, short events_)
|
||||
{
|
||||
const items_t::iterator end = _items.end ();
|
||||
items_t::iterator it;
|
||||
@ -322,7 +322,7 @@ int zmq::socket_poller_t::rebuild ()
|
||||
if (it->socket) {
|
||||
if (!is_thread_safe (*it->socket)) {
|
||||
size_t fd_size = sizeof (zmq::fd_t);
|
||||
int rc = it->socket->getsockopt (
|
||||
const int rc = it->socket->getsockopt (
|
||||
ZMQ_FD, &_pollfds[item_nbr].fd, &fd_size);
|
||||
zmq_assert (rc == 0);
|
||||
|
||||
@ -457,7 +457,7 @@ int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
|
||||
else {
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
|
||||
short revents = _pollfds[it->pollfd_index].revents;
|
||||
const short revents = _pollfds[it->pollfd_index].revents;
|
||||
short events = 0;
|
||||
|
||||
if (revents & POLLIN)
|
||||
@ -542,7 +542,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
|
||||
}
|
||||
|
||||
if (_need_rebuild) {
|
||||
int rc = rebuild ();
|
||||
const int rc = rebuild ();
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
}
|
||||
@ -596,7 +596,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
|
||||
static_cast<int> (std::min<uint64_t> (end - now, INT_MAX));
|
||||
|
||||
// Wait for events.
|
||||
int rc = poll (_pollfds, _pollset_size, timeout);
|
||||
const int rc = poll (_pollfds, _pollset_size, timeout);
|
||||
if (rc == -1 && errno == EINTR) {
|
||||
return -1;
|
||||
}
|
||||
@ -607,7 +607,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
|
||||
_signaler->recv ();
|
||||
|
||||
// Check for the events.
|
||||
int found = check_events (events_, n_events_);
|
||||
const int found = check_events (events_, n_events_);
|
||||
if (found) {
|
||||
if (found > 0)
|
||||
zero_trail_events (events_, n_events_, found);
|
||||
|
@ -69,7 +69,7 @@ class socket_poller_t
|
||||
} event_t;
|
||||
|
||||
int add (socket_base_t *socket_, void *user_data_, short events_);
|
||||
int modify (socket_base_t *socket_, short events_);
|
||||
int modify (const socket_base_t *socket_, short events_);
|
||||
int remove (socket_base_t *socket_);
|
||||
|
||||
int add_fd (fd_t fd_, void *user_data_, short events_);
|
||||
|
@ -146,17 +146,17 @@ int zmq::stream_t::xsend (msg_t *msg_)
|
||||
_current_out = NULL;
|
||||
return 0;
|
||||
}
|
||||
bool ok = _current_out->write (msg_);
|
||||
const bool ok = _current_out->write (msg_);
|
||||
if (likely (ok))
|
||||
_current_out->flush ();
|
||||
_current_out = NULL;
|
||||
} else {
|
||||
int rc = msg_->close ();
|
||||
const int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
// Detach the message from the data buffer.
|
||||
int rc = msg_->init ();
|
||||
const int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
return 0;
|
||||
@ -181,11 +181,11 @@ int zmq::stream_t::xrecv (msg_t *msg_)
|
||||
{
|
||||
if (_prefetched) {
|
||||
if (!_routing_id_sent) {
|
||||
int rc = msg_->move (_prefetched_routing_id);
|
||||
const int rc = msg_->move (_prefetched_routing_id);
|
||||
errno_assert (rc == 0);
|
||||
_routing_id_sent = true;
|
||||
} else {
|
||||
int rc = msg_->move (_prefetched_msg);
|
||||
const int rc = msg_->move (_prefetched_msg);
|
||||
errno_assert (rc == 0);
|
||||
_prefetched = false;
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ zmq::stream_engine_base_t::stream_engine_base_t (
|
||||
_session (NULL),
|
||||
_socket (NULL)
|
||||
{
|
||||
int rc = _tx_msg.init ();
|
||||
const int rc = _tx_msg.init ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Put the socket into non-blocking mode.
|
||||
@ -143,7 +143,7 @@ zmq::stream_engine_base_t::~stream_engine_base_t ()
|
||||
|
||||
if (_s != retired_fd) {
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
int rc = closesocket (_s);
|
||||
const int rc = closesocket (_s);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#else
|
||||
int rc = close (_s);
|
||||
@ -158,7 +158,7 @@ zmq::stream_engine_base_t::~stream_engine_base_t ()
|
||||
_s = retired_fd;
|
||||
}
|
||||
|
||||
int rc = _tx_msg.close ();
|
||||
const int rc = _tx_msg.close ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Drop reference to metadata and destroy it if we are
|
||||
@ -275,7 +275,7 @@ bool zmq::stream_engine_base_t::in_event_internal ()
|
||||
size_t bufsize = 0;
|
||||
_decoder->get_buffer (&_inpos, &bufsize);
|
||||
|
||||
int rc = read (_inpos, bufsize);
|
||||
const int rc = read (_inpos, bufsize);
|
||||
|
||||
if (rc == -1) {
|
||||
if (errno != EAGAIN) {
|
||||
@ -343,7 +343,7 @@ void zmq::stream_engine_base_t::out_event ()
|
||||
break;
|
||||
_encoder->load_msg (&_tx_msg);
|
||||
unsigned char *bufptr = _outpos + _outsize;
|
||||
size_t n =
|
||||
const size_t n =
|
||||
_encoder->encode (&bufptr, _options.out_batch_size - _outsize);
|
||||
zmq_assert (n > 0);
|
||||
if (_outpos == NULL)
|
||||
@ -679,7 +679,7 @@ void zmq::stream_engine_base_t::error (error_reason_t reason_)
|
||||
if (reason_ != protocol_error
|
||||
&& (_mechanism == NULL
|
||||
|| _mechanism->status () == mechanism_t::handshaking)) {
|
||||
int err = errno;
|
||||
const int err = errno;
|
||||
_socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err);
|
||||
}
|
||||
|
||||
@ -739,7 +739,7 @@ void zmq::stream_engine_base_t::timer_event (int id_)
|
||||
|
||||
int zmq::stream_engine_base_t::read (void *data_, size_t size_)
|
||||
{
|
||||
int rc = zmq::tcp_read (_s, data_, size_);
|
||||
const int rc = zmq::tcp_read (_s, data_, size_);
|
||||
|
||||
if (rc == 0) {
|
||||
// connection closed by peer
|
||||
|
17
src/tcp.cpp
17
src/tcp.cpp
@ -60,7 +60,8 @@ int zmq::tune_tcp_socket (fd_t s_)
|
||||
// so using Nagle wouldn't improve throughput in anyway, but it would
|
||||
// hurt latency.
|
||||
int nodelay = 1;
|
||||
int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY,
|
||||
const int rc =
|
||||
setsockopt (s_, IPPROTO_TCP, TCP_NODELAY,
|
||||
reinterpret_cast<char *> (&nodelay), sizeof (int));
|
||||
assert_success_or_recoverable (s_, rc);
|
||||
if (rc != 0)
|
||||
@ -120,7 +121,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
|
||||
keepalive_opts.keepaliveinterval =
|
||||
keepalive_intvl_ != -1 ? keepalive_intvl_ * 1000 : 1000;
|
||||
DWORD num_bytes_returned;
|
||||
int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
|
||||
const int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
|
||||
sizeof (keepalive_opts), NULL, 0,
|
||||
&num_bytes_returned, NULL, NULL);
|
||||
assert_success_or_recoverable (s_, rc);
|
||||
@ -193,7 +194,7 @@ int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
|
||||
#if defined(ZMQ_HAVE_WINDOWS) && defined(TCP_MAXRT)
|
||||
// msdn says it's supported in >= Vista, >= Windows Server 2003
|
||||
timeout_ /= 1000; // in seconds
|
||||
int rc =
|
||||
const int rc =
|
||||
setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT,
|
||||
reinterpret_cast<char *> (&timeout_), sizeof (timeout_));
|
||||
assert_success_or_recoverable (sockfd_, rc);
|
||||
@ -213,7 +214,7 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
|
||||
{
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
|
||||
int nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0);
|
||||
const int nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0);
|
||||
|
||||
// If not a single byte can be written to the socket in non-blocking mode
|
||||
// we'll get an error (this may happen during the speculative write).
|
||||
@ -322,12 +323,12 @@ void zmq::tcp_tune_loopback_fast_path (const fd_t socket_)
|
||||
int sio_loopback_fastpath = 1;
|
||||
DWORD number_of_bytes_returned = 0;
|
||||
|
||||
int rc = WSAIoctl (socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath,
|
||||
sizeof sio_loopback_fastpath, NULL, 0,
|
||||
&number_of_bytes_returned, 0, 0);
|
||||
const int rc = WSAIoctl (
|
||||
socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath,
|
||||
sizeof sio_loopback_fastpath, NULL, 0, &number_of_bytes_returned, 0, 0);
|
||||
|
||||
if (SOCKET_ERROR == rc) {
|
||||
DWORD last_error = ::WSAGetLastError ();
|
||||
const DWORD last_error = ::WSAGetLastError ();
|
||||
|
||||
if (WSAEOPNOTSUPP == last_error) {
|
||||
// This system is not Windows 8 or Server 2012, and the call is not supported.
|
||||
|
@ -75,7 +75,7 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_)
|
||||
// Test the ';' to know if we have a source address in name_
|
||||
const char *src_delimiter = strrchr (name_, ';');
|
||||
if (src_delimiter) {
|
||||
std::string src_name (name_, src_delimiter - name_);
|
||||
const std::string src_name (name_, src_delimiter - name_);
|
||||
|
||||
ip_resolver_options_t src_resolver_opts;
|
||||
|
||||
|
@ -68,7 +68,7 @@ zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
|
||||
|
||||
void zmq::tcp_listener_t::in_event ()
|
||||
{
|
||||
fd_t fd = accept ();
|
||||
const fd_t fd = accept ();
|
||||
|
||||
// If connection was reset by the peer in the meantime, just ignore it.
|
||||
// TODO: Handle specific errors like ENFILE/EMFILE etc.
|
||||
@ -163,7 +163,7 @@ int zmq::tcp_listener_t::create_socket (const char *addr_)
|
||||
return 0;
|
||||
|
||||
error:
|
||||
int err = errno;
|
||||
const int err = errno;
|
||||
close ();
|
||||
errno = err;
|
||||
return -1;
|
||||
@ -205,7 +205,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
|
||||
fd_t sock = ::accept4 (_s, reinterpret_cast<struct sockaddr *> (&ss),
|
||||
&ss_len, SOCK_CLOEXEC);
|
||||
#else
|
||||
fd_t sock =
|
||||
const fd_t sock =
|
||||
::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
|
||||
#endif
|
||||
|
||||
@ -244,7 +244,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
|
||||
}
|
||||
if (!matched) {
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
int rc = closesocket (sock);
|
||||
const int rc = closesocket (sock);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#else
|
||||
int rc = ::close (sock);
|
||||
@ -256,7 +256,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
|
||||
|
||||
if (zmq::set_nosigpipe (sock)) {
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
int rc = closesocket (sock);
|
||||
const int rc = closesocket (sock);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#else
|
||||
int rc = ::close (sock);
|
||||
|
@ -82,9 +82,9 @@ bool zmq::thread_t::is_current_thread () const
|
||||
void zmq::thread_t::stop ()
|
||||
{
|
||||
if (_started) {
|
||||
DWORD rc = WaitForSingleObject (_descriptor, INFINITE);
|
||||
const DWORD rc = WaitForSingleObject (_descriptor, INFINITE);
|
||||
win_assert (rc != WAIT_FAILED);
|
||||
BOOL rc2 = CloseHandle (_descriptor);
|
||||
const BOOL rc2 = CloseHandle (_descriptor);
|
||||
win_assert (rc2 != 0);
|
||||
}
|
||||
}
|
||||
@ -154,7 +154,7 @@ void zmq::thread_t::
|
||||
|
||||
// push our handler, raise, and finally pop our handler
|
||||
tib->ExceptionList = (_EXCEPTION_REGISTRATION_RECORD *) &rec;
|
||||
DWORD MS_VC_EXCEPTION = 0x406D1388;
|
||||
const DWORD MS_VC_EXCEPTION = 0x406D1388;
|
||||
RaiseException (MS_VC_EXCEPTION, 0,
|
||||
sizeof (thread_info) / sizeof (ULONG_PTR),
|
||||
(ULONG_PTR *) &thread_info);
|
||||
|
@ -236,7 +236,7 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool zmq::trie_t::check (unsigned char *data_, size_t size_)
|
||||
bool zmq::trie_t::check (const unsigned char *data_, size_t size_)
|
||||
{
|
||||
// This function is on critical path. It deliberately doesn't use
|
||||
// recursion to get a bit better performance.
|
||||
|
@ -52,7 +52,7 @@ class trie_t
|
||||
bool rm (unsigned char *prefix_, size_t size_);
|
||||
|
||||
// Check whether particular key is in the trie.
|
||||
bool check (unsigned char *data_, size_t size_);
|
||||
bool check (const unsigned char *data_, size_t size_);
|
||||
|
||||
// Apply the function supplied to each subscription in the trie.
|
||||
void apply (void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
||||
|
@ -679,7 +679,7 @@ sv scalarmult(gf p[4],gf q[4],const u8 *s)
|
||||
set25519(p[2],gf1);
|
||||
set25519(p[3],gf0);
|
||||
for (i = 255;i >= 0;--i) {
|
||||
u8 b = (s[i/8]>>(i&7))&1;
|
||||
const u8 b = (s[i/8]>>(i&7))&1;
|
||||
cswap(p,q,b);
|
||||
add(q,p);
|
||||
add(p,p);
|
||||
|
@ -68,7 +68,7 @@ int zmq::udp_address_t::resolve (const char *name_, bool bind_, bool ipv6_)
|
||||
// URL
|
||||
const char *src_delimiter = strrchr (name_, ';');
|
||||
if (src_delimiter) {
|
||||
std::string src_name (name_, src_delimiter - name_);
|
||||
const std::string src_name (name_, src_delimiter - name_);
|
||||
|
||||
ip_resolver_options_t src_resolver_opts;
|
||||
|
||||
@ -128,13 +128,13 @@ int zmq::udp_address_t::resolve (const char *name_, bool bind_, bool ipv6_)
|
||||
|
||||
ip_resolver_t resolver (resolver_opts);
|
||||
|
||||
int rc = resolver.resolve (&_target_address, name_);
|
||||
const int rc = resolver.resolve (&_target_address, name_);
|
||||
if (rc != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
_is_multicast = _target_address.is_multicast ();
|
||||
uint16_t port = _target_address.port ();
|
||||
const uint16_t port = _target_address.port ();
|
||||
|
||||
if (has_interface) {
|
||||
// If we have an interface specifier then the target address must be a
|
||||
|
@ -73,7 +73,7 @@ zmq::udp_engine_t::~udp_engine_t ()
|
||||
|
||||
if (_fd != retired_fd) {
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
int rc = closesocket (_fd);
|
||||
const int rc = closesocket (_fd);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#else
|
||||
int rc = close (_fd);
|
||||
@ -135,7 +135,7 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
|
||||
_out_address_len = out->sockaddr_len ();
|
||||
|
||||
if (out->is_multicast ()) {
|
||||
bool is_ipv6 = (out->family () == AF_INET6);
|
||||
const bool is_ipv6 = (out->family () == AF_INET6);
|
||||
rc = rc
|
||||
| set_udp_multicast_loop (_fd, is_ipv6,
|
||||
_options.multicast_loop);
|
||||
@ -163,7 +163,7 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
|
||||
ip_addr_t any = ip_addr_t::any (bind_addr->family ());
|
||||
const ip_addr_t *real_bind_addr;
|
||||
|
||||
bool multicast = udp_addr->is_mcast ();
|
||||
const bool multicast = udp_addr->is_mcast ();
|
||||
|
||||
if (multicast) {
|
||||
// Multicast addresses should be allowed to bind to more than
|
||||
@ -236,8 +236,8 @@ int zmq::udp_engine_t::set_udp_multicast_loop (fd_t s_,
|
||||
}
|
||||
|
||||
int loop = loop_ ? 1 : 0;
|
||||
int rc = setsockopt (s_, level, optname, reinterpret_cast<char *> (&loop),
|
||||
sizeof (loop));
|
||||
const int rc = setsockopt (s_, level, optname,
|
||||
reinterpret_cast<char *> (&loop), sizeof (loop));
|
||||
assert_success_or_recoverable (s_, rc);
|
||||
return rc;
|
||||
}
|
||||
@ -252,7 +252,8 @@ int zmq::udp_engine_t::set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_)
|
||||
level = IPPROTO_IP;
|
||||
}
|
||||
|
||||
int rc = setsockopt (s_, level, IP_MULTICAST_TTL,
|
||||
const int rc =
|
||||
setsockopt (s_, level, IP_MULTICAST_TTL,
|
||||
reinterpret_cast<char *> (&hops_), sizeof (hops_));
|
||||
assert_success_or_recoverable (s_, rc);
|
||||
return rc;
|
||||
@ -291,7 +292,7 @@ int zmq::udp_engine_t::set_udp_multicast_iface (fd_t s_,
|
||||
int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_)
|
||||
{
|
||||
int on = on_ ? 1 : 0;
|
||||
int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR,
|
||||
const int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR,
|
||||
reinterpret_cast<char *> (&on), sizeof (on));
|
||||
assert_success_or_recoverable (s_, rc);
|
||||
return rc;
|
||||
@ -325,7 +326,7 @@ int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_)
|
||||
|
||||
} else if (mcast_addr->family () == AF_INET6) {
|
||||
struct ipv6_mreq mreq;
|
||||
int iface = addr_->bind_if ();
|
||||
const int iface = addr_->bind_if ();
|
||||
|
||||
zmq_assert (iface >= -1);
|
||||
|
||||
@ -360,7 +361,8 @@ void zmq::udp_engine_t::terminate ()
|
||||
delete this;
|
||||
}
|
||||
|
||||
void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_)
|
||||
void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg_,
|
||||
const sockaddr_in *addr_)
|
||||
{
|
||||
const char *const name = inet_ntoa (addr_->sin_addr);
|
||||
|
||||
@ -387,7 +389,7 @@ void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_)
|
||||
*address = 0;
|
||||
}
|
||||
|
||||
int zmq::udp_engine_t::resolve_raw_address (char *name_, size_t length_)
|
||||
int zmq::udp_engine_t::resolve_raw_address (const char *name_, size_t length_)
|
||||
{
|
||||
memset (&_raw_address, 0, sizeof _raw_address);
|
||||
|
||||
@ -396,7 +398,7 @@ int zmq::udp_engine_t::resolve_raw_address (char *name_, size_t length_)
|
||||
// Find delimiter, cannot use memrchr as it is not supported on windows
|
||||
if (length_ != 0) {
|
||||
int chars_left = static_cast<int> (length_);
|
||||
char *current_char = name_ + length_;
|
||||
const char *current_char = name_ + length_;
|
||||
do {
|
||||
if (*(--current_char) == ':') {
|
||||
delimiter = current_char;
|
||||
@ -410,11 +412,11 @@ int zmq::udp_engine_t::resolve_raw_address (char *name_, size_t length_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
std::string addr_str (name_, delimiter - name_);
|
||||
std::string port_str (delimiter + 1, name_ + length_ - delimiter - 1);
|
||||
const std::string addr_str (name_, delimiter - name_);
|
||||
const std::string port_str (delimiter + 1, name_ + length_ - delimiter - 1);
|
||||
|
||||
// Parse the port number (0 is not a valid port).
|
||||
uint16_t port = static_cast<uint16_t> (atoi (port_str.c_str ()));
|
||||
const uint16_t port = static_cast<uint16_t> (atoi (port_str.c_str ()));
|
||||
if (port == 0) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
|
@ -47,8 +47,8 @@ class udp_engine_t ZMQ_FINAL : public io_object_t, public i_engine
|
||||
const endpoint_uri_pair_t &get_endpoint () const ZMQ_FINAL;
|
||||
|
||||
private:
|
||||
int resolve_raw_address (char *name_, size_t length_);
|
||||
void sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_);
|
||||
int resolve_raw_address (const char *name_, size_t length_);
|
||||
void sockaddr_to_msg (zmq::msg_t *msg_, const sockaddr_in *addr_);
|
||||
|
||||
int set_udp_reuse_address (fd_t s_, bool on_);
|
||||
int set_udp_reuse_port (fd_t s_, bool on_);
|
||||
|
@ -52,7 +52,7 @@ zmq::v1_decoder_t::v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
|
||||
|
||||
zmq::v1_decoder_t::~v1_decoder_t ()
|
||||
{
|
||||
int rc = _in_progress.close ();
|
||||
const int rc = _in_progress.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
|
@ -55,7 +55,7 @@ zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_,
|
||||
|
||||
zmq::v2_decoder_t::~v2_decoder_t ()
|
||||
{
|
||||
int rc = _in_progress.close ();
|
||||
const int rc = _in_progress.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
|
@ -193,7 +193,7 @@ int zmq::ws_connecter_t::open ()
|
||||
#if defined ZMQ_HAVE_VXWORKS
|
||||
int rc = ::connect (_s, (sockaddr *) tcp_addr.addr (), tcp_addr.addrlen ());
|
||||
#else
|
||||
int rc = ::connect (_s, tcp_addr.addr (), tcp_addr.addrlen ());
|
||||
const int rc = ::connect (_s, tcp_addr.addr (), tcp_addr.addrlen ());
|
||||
#endif
|
||||
// Connect was successful immediately.
|
||||
if (rc == 0) {
|
||||
|
@ -59,13 +59,13 @@ zmq::ws_decoder_t::ws_decoder_t (size_t bufsize_,
|
||||
|
||||
zmq::ws_decoder_t::~ws_decoder_t ()
|
||||
{
|
||||
int rc = _in_progress.close ();
|
||||
const int rc = _in_progress.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
int zmq::ws_decoder_t::opcode_ready (unsigned char const *)
|
||||
{
|
||||
bool final = (_tmpbuf[0] & 0x80) != 0; // final bit
|
||||
const bool final = (_tmpbuf[0] & 0x80) != 0; // final bit
|
||||
if (!final)
|
||||
return -1; // non final messages are not supported
|
||||
|
||||
@ -96,7 +96,7 @@ int zmq::ws_decoder_t::opcode_ready (unsigned char const *)
|
||||
|
||||
int zmq::ws_decoder_t::size_first_byte_ready (unsigned char const *read_from_)
|
||||
{
|
||||
bool is_masked = (_tmpbuf[0] & 0x80) != 0;
|
||||
const bool is_masked = (_tmpbuf[0] & 0x80) != 0;
|
||||
|
||||
if (is_masked != _must_mask) // wrong mask value
|
||||
return -1;
|
||||
|
@ -82,7 +82,7 @@ void zmq::ws_encoder_t::message_ready ()
|
||||
}
|
||||
|
||||
if (_must_mask) {
|
||||
uint32_t random = generate_random ();
|
||||
const uint32_t random = generate_random ();
|
||||
put_uint32 (_tmp_buf + offset, random);
|
||||
put_uint32 (_mask, random);
|
||||
offset += 4;
|
||||
@ -107,7 +107,7 @@ void zmq::ws_encoder_t::size_ready ()
|
||||
{
|
||||
if (_must_mask) {
|
||||
assert (in_progress () != &_masked_msg);
|
||||
size_t size = in_progress ()->size ();
|
||||
const size_t size = in_progress ()->size ();
|
||||
|
||||
_masked_msg.close ();
|
||||
_masked_msg.init_size (size);
|
||||
|
@ -113,7 +113,7 @@ static void compute_accept_key (char *key_,
|
||||
zmq::ws_engine_t::ws_engine_t (fd_t fd_,
|
||||
const options_t &options_,
|
||||
const endpoint_uri_pair_t &endpoint_uri_pair_,
|
||||
ws_address_t &address_,
|
||||
const ws_address_t &address_,
|
||||
bool client_) :
|
||||
stream_engine_base_t (fd_, options_, endpoint_uri_pair_),
|
||||
_client (client_),
|
||||
@ -203,7 +203,7 @@ void zmq::ws_engine_t::plug_internal ()
|
||||
|
||||
int zmq::ws_engine_t::routing_id_msg (msg_t *msg_)
|
||||
{
|
||||
int rc = msg_->init_size (_options.routing_id_size);
|
||||
const int rc = msg_->init_size (_options.routing_id_size);
|
||||
errno_assert (rc == 0);
|
||||
if (_options.routing_id_size > 0)
|
||||
memcpy (msg_->data (), _options.routing_id, _options.routing_id_size);
|
||||
@ -216,7 +216,7 @@ int zmq::ws_engine_t::process_routing_id_msg (msg_t *msg_)
|
||||
{
|
||||
if (_options.recv_routing_id) {
|
||||
msg_->set_flags (msg_t::routing_id);
|
||||
int rc = session ()->push_msg (msg_);
|
||||
const int rc = session ()->push_msg (msg_);
|
||||
errno_assert (rc == 0);
|
||||
} else {
|
||||
int rc = msg_->close ();
|
||||
@ -230,7 +230,7 @@ int zmq::ws_engine_t::process_routing_id_msg (msg_t *msg_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool zmq::ws_engine_t::select_protocol (char *protocol_)
|
||||
bool zmq::ws_engine_t::select_protocol (const char *protocol_)
|
||||
{
|
||||
if (_options.mechanism == ZMQ_NULL && (strcmp ("ZWS2.0", protocol_) == 0)) {
|
||||
_next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
|
||||
@ -309,7 +309,7 @@ bool zmq::ws_engine_t::handshake ()
|
||||
|
||||
bool zmq::ws_engine_t::server_handshake ()
|
||||
{
|
||||
int nbytes = read (_read_buffer, WS_BUFFER_SIZE);
|
||||
const int nbytes = read (_read_buffer, WS_BUFFER_SIZE);
|
||||
if (nbytes == -1) {
|
||||
if (errno != EAGAIN)
|
||||
error (zmq::i_engine::connection_error);
|
||||
@ -320,7 +320,7 @@ bool zmq::ws_engine_t::server_handshake ()
|
||||
_insize = nbytes;
|
||||
|
||||
while (_insize > 0) {
|
||||
char c = static_cast<char> (*_inpos);
|
||||
const char c = static_cast<char> (*_inpos);
|
||||
|
||||
switch (_server_handshake_state) {
|
||||
case handshake_initial:
|
||||
@ -529,13 +529,13 @@ bool zmq::ws_engine_t::server_handshake ()
|
||||
unsigned char hash[SHA_DIGEST_LENGTH];
|
||||
compute_accept_key (_websocket_key, hash);
|
||||
|
||||
int accept_key_len = encode_base64 (
|
||||
const int accept_key_len = encode_base64 (
|
||||
hash, SHA_DIGEST_LENGTH, _websocket_accept,
|
||||
MAX_HEADER_VALUE_LENGTH);
|
||||
assert (accept_key_len > 0);
|
||||
_websocket_accept[accept_key_len] = '\0';
|
||||
|
||||
int written =
|
||||
const int written =
|
||||
snprintf (reinterpret_cast<char *> (_write_buffer),
|
||||
WS_BUFFER_SIZE,
|
||||
"HTTP/1.1 101 Switching Protocols\r\n"
|
||||
@ -580,7 +580,7 @@ bool zmq::ws_engine_t::server_handshake ()
|
||||
|
||||
bool zmq::ws_engine_t::client_handshake ()
|
||||
{
|
||||
int nbytes = read (_read_buffer, WS_BUFFER_SIZE);
|
||||
const int nbytes = read (_read_buffer, WS_BUFFER_SIZE);
|
||||
if (nbytes == -1) {
|
||||
if (errno != EAGAIN)
|
||||
error (zmq::i_engine::connection_error);
|
||||
@ -591,7 +591,7 @@ bool zmq::ws_engine_t::client_handshake ()
|
||||
_insize = nbytes;
|
||||
|
||||
while (_insize > 0) {
|
||||
char c = static_cast<char> (*_inpos);
|
||||
const char c = static_cast<char> (*_inpos);
|
||||
|
||||
switch (_client_handshake_state) {
|
||||
case client_handshake_initial:
|
||||
|
@ -130,7 +130,7 @@ class ws_engine_t ZMQ_FINAL : public stream_engine_base_t
|
||||
ws_engine_t (fd_t fd_,
|
||||
const options_t &options_,
|
||||
const endpoint_uri_pair_t &endpoint_uri_pair_,
|
||||
ws_address_t &address_,
|
||||
const ws_address_t &address_,
|
||||
bool client_);
|
||||
~ws_engine_t () ZMQ_FINAL;
|
||||
|
||||
@ -150,7 +150,7 @@ class ws_engine_t ZMQ_FINAL : public stream_engine_base_t
|
||||
int produce_no_msg_after_close (msg_t *msg_);
|
||||
int close_connection_after_close (msg_t *msg_);
|
||||
|
||||
bool select_protocol (char *protocol);
|
||||
bool select_protocol (const char *protocol);
|
||||
|
||||
bool client_handshake ();
|
||||
bool server_handshake ();
|
||||
|
@ -99,7 +99,7 @@ zmq::ws_listener_t::~ws_listener_t ()
|
||||
|
||||
void zmq::ws_listener_t::in_event ()
|
||||
{
|
||||
fd_t fd = accept ();
|
||||
const fd_t fd = accept ();
|
||||
|
||||
// If connection was reset by the peer in the meantime, just ignore it.
|
||||
// TODO: Handle specific errors like ENFILE/EMFILE etc.
|
||||
@ -193,7 +193,7 @@ int zmq::ws_listener_t::create_socket (const char *addr_)
|
||||
return 0;
|
||||
|
||||
error:
|
||||
int err = errno;
|
||||
const int err = errno;
|
||||
close ();
|
||||
errno = err;
|
||||
return -1;
|
||||
@ -206,7 +206,7 @@ int zmq::ws_listener_t::set_local_address (const char *addr_)
|
||||
// socket was already created by the application
|
||||
_s = options.use_fd;
|
||||
} else {
|
||||
int rc = _address.resolve (addr_, true, options.ipv6);
|
||||
const int rc = _address.resolve (addr_, true, options.ipv6);
|
||||
if (rc != 0)
|
||||
return -1;
|
||||
|
||||
@ -248,7 +248,7 @@ zmq::fd_t zmq::ws_listener_t::accept ()
|
||||
fd_t sock = ::accept4 (_s, reinterpret_cast<struct sockaddr *> (&ss),
|
||||
&ss_len, SOCK_CLOEXEC);
|
||||
#else
|
||||
fd_t sock =
|
||||
const fd_t sock =
|
||||
::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
|
||||
#endif
|
||||
|
||||
@ -275,7 +275,7 @@ zmq::fd_t zmq::ws_listener_t::accept ()
|
||||
|
||||
if (zmq::set_nosigpipe (sock)) {
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
int rc = closesocket (sock);
|
||||
const int rc = closesocket (sock);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#else
|
||||
int rc = ::close (sock);
|
||||
|
14
src/xpub.cpp
14
src/xpub.cpp
@ -79,9 +79,9 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_,
|
||||
if (_welcome_msg.size () > 0) {
|
||||
msg_t copy;
|
||||
copy.init ();
|
||||
int rc = copy.copy (_welcome_msg);
|
||||
const int rc = copy.copy (_welcome_msg);
|
||||
errno_assert (rc == 0);
|
||||
bool ok = pipe_->write (©);
|
||||
const bool ok = pipe_->write (©);
|
||||
zmq_assert (ok);
|
||||
pipe_->flush ();
|
||||
}
|
||||
@ -103,7 +103,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
|
||||
bool subscribe = false;
|
||||
bool is_subscribe_or_cancel = false;
|
||||
|
||||
bool first_part = !_more_recv;
|
||||
const bool first_part = !_more_recv;
|
||||
_more_recv = (msg.flags () & msg_t::more) != 0;
|
||||
|
||||
if (first_part || _process_subscribe) {
|
||||
@ -164,12 +164,12 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
|
||||
} else {
|
||||
bool notify;
|
||||
if (!subscribe) {
|
||||
mtrie_t::rm_result rm_result =
|
||||
const mtrie_t::rm_result rm_result =
|
||||
_subscriptions.rm (data, size, pipe_);
|
||||
// TODO reconsider what to do if rm_result == mtrie_t::not_found
|
||||
notify = rm_result != mtrie_t::values_remain || _verbose_unsubs;
|
||||
} else {
|
||||
bool first_added = _subscriptions.add (data, size, pipe_);
|
||||
const bool first_added = _subscriptions.add (data, size, pipe_);
|
||||
notify = first_added || _verbose_subs;
|
||||
}
|
||||
|
||||
@ -239,7 +239,7 @@ int zmq::xpub_t::xsetsockopt (int option_,
|
||||
_welcome_msg.close ();
|
||||
|
||||
if (optvallen_ > 0) {
|
||||
int rc = _welcome_msg.init_size (optvallen_);
|
||||
const int rc = _welcome_msg.init_size (optvallen_);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
unsigned char *data =
|
||||
@ -294,7 +294,7 @@ void zmq::xpub_t::mark_last_pipe_as_matching (pipe_t *pipe_, xpub_t *self_)
|
||||
|
||||
int zmq::xpub_t::xsend (msg_t *msg_)
|
||||
{
|
||||
bool msg_more = (msg_->flags () & msg_t::more) != 0;
|
||||
const bool msg_more = (msg_->flags () & msg_t::more) != 0;
|
||||
|
||||
// For the first part of multi-part message, find the matching pipes.
|
||||
if (!_more_send) {
|
||||
|
14
src/xsub.cpp
14
src/xsub.cpp
@ -48,13 +48,13 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
// subscription commands are sent to the wire.
|
||||
options.linger.store (0);
|
||||
|
||||
int rc = _message.init ();
|
||||
const int rc = _message.init ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
zmq::xsub_t::~xsub_t ()
|
||||
{
|
||||
int rc = _message.close ();
|
||||
const int rc = _message.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
@ -119,7 +119,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
|
||||
size_t size = msg_->size ();
|
||||
unsigned char *data = static_cast<unsigned char *> (msg_->data ());
|
||||
|
||||
bool first_part = !_more_send;
|
||||
const bool first_part = !_more_send;
|
||||
_more_send = (msg_->flags () & msg_t::more) != 0;
|
||||
|
||||
if (first_part) {
|
||||
@ -181,7 +181,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_)
|
||||
// If there's already a message prepared by a previous call to zmq_poll,
|
||||
// return it straight ahead.
|
||||
if (_has_message) {
|
||||
int rc = msg_->move (_message);
|
||||
const int rc = msg_->move (_message);
|
||||
errno_assert (rc == 0);
|
||||
_has_message = false;
|
||||
_more_recv = (msg_->flags () & msg_t::more) != 0;
|
||||
@ -257,7 +257,7 @@ bool zmq::xsub_t::xhas_in ()
|
||||
|
||||
bool zmq::xsub_t::match (msg_t *msg_)
|
||||
{
|
||||
bool matching = _subscriptions.check (
|
||||
const bool matching = _subscriptions.check (
|
||||
static_cast<unsigned char *> (msg_->data ()), msg_->size ());
|
||||
|
||||
return matching ^ options.invert_matching;
|
||||
@ -271,7 +271,7 @@ void zmq::xsub_t::send_subscription (unsigned char *data_,
|
||||
|
||||
// Create the subscription message.
|
||||
msg_t msg;
|
||||
int rc = msg.init_size (size_ + 1);
|
||||
const int rc = msg.init_size (size_ + 1);
|
||||
errno_assert (rc == 0);
|
||||
unsigned char *data = static_cast<unsigned char *> (msg.data ());
|
||||
data[0] = 1;
|
||||
@ -283,7 +283,7 @@ void zmq::xsub_t::send_subscription (unsigned char *data_,
|
||||
}
|
||||
|
||||
// Send it to the pipe.
|
||||
bool sent = pipe->write (&msg);
|
||||
const bool sent = pipe->write (&msg);
|
||||
// If we reached the SNDHWM, and thus cannot send the subscription, drop
|
||||
// the subscription message instead. This matches the behaviour of
|
||||
// zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
|
||||
|
@ -168,7 +168,7 @@ template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
// The pipe mustn't be empty or the function crashes.
|
||||
inline bool probe (bool (*fn_) (const T &)) ZMQ_FINAL
|
||||
{
|
||||
bool rc = check_read ();
|
||||
const bool rc = check_read ();
|
||||
zmq_assert (rc);
|
||||
|
||||
return (*fn_) (_queue.front ());
|
||||
|
@ -80,7 +80,7 @@ template <typename T> class ypipe_conflate_t ZMQ_FINAL : public ypipe_base_t<T>
|
||||
// Check whether item is available for reading.
|
||||
inline bool check_read () ZMQ_FINAL
|
||||
{
|
||||
bool res = dbuffer.check_read ();
|
||||
const bool res = dbuffer.check_read ();
|
||||
if (!res)
|
||||
reader_awake = false;
|
||||
|
||||
|
44
src/zmq.cpp
44
src/zmq.cpp
@ -153,8 +153,8 @@ int zmq_ctx_term (void *ctx_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
int rc = (static_cast<zmq::ctx_t *> (ctx_))->terminate ();
|
||||
int en = errno;
|
||||
const int rc = (static_cast<zmq::ctx_t *> (ctx_))->terminate ();
|
||||
const int en = errno;
|
||||
|
||||
// Shut down only if termination was not interrupted by a signal.
|
||||
if (!rc || en != EINTR) {
|
||||
@ -358,7 +358,7 @@ static inline int
|
||||
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
|
||||
{
|
||||
size_t sz = zmq_msg_size (msg_);
|
||||
int rc = s_->send (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
|
||||
const int rc = s_->send (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
@ -390,10 +390,10 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
|
||||
assert (buf_);
|
||||
memcpy (zmq_msg_data (&msg), buf_, len_);
|
||||
}
|
||||
int rc = s_sendmsg (s, &msg, flags_);
|
||||
const int rc = s_sendmsg (s, &msg, flags_);
|
||||
if (unlikely (rc < 0)) {
|
||||
int err = errno;
|
||||
int rc2 = zmq_msg_close (&msg);
|
||||
const int err = errno;
|
||||
const int rc2 = zmq_msg_close (&msg);
|
||||
errno_assert (rc2 == 0);
|
||||
errno = err;
|
||||
return -1;
|
||||
@ -416,8 +416,8 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
|
||||
|
||||
rc = s_sendmsg (s, &msg, flags_);
|
||||
if (unlikely (rc < 0)) {
|
||||
int err = errno;
|
||||
int rc2 = zmq_msg_close (&msg);
|
||||
const int err = errno;
|
||||
const int rc2 = zmq_msg_close (&msg);
|
||||
errno_assert (rc2 == 0);
|
||||
errno = err;
|
||||
return -1;
|
||||
@ -459,8 +459,8 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
|
||||
flags_ = flags_ & ~ZMQ_SNDMORE;
|
||||
rc = s_sendmsg (s, &msg, flags_);
|
||||
if (unlikely (rc < 0)) {
|
||||
int err = errno;
|
||||
int rc2 = zmq_msg_close (&msg);
|
||||
const int err = errno;
|
||||
const int rc2 = zmq_msg_close (&msg);
|
||||
errno_assert (rc2 == 0);
|
||||
errno = err;
|
||||
rc = -1;
|
||||
@ -474,12 +474,12 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
|
||||
|
||||
static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
|
||||
{
|
||||
int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
|
||||
const int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
// Truncate returned size to INT_MAX to avoid overflow to negative values
|
||||
size_t sz = zmq_msg_size (msg_);
|
||||
const size_t sz = zmq_msg_size (msg_);
|
||||
return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
|
||||
}
|
||||
|
||||
@ -499,9 +499,9 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
|
||||
int rc = zmq_msg_init (&msg);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
int nbytes = s_recvmsg (s, &msg, flags_);
|
||||
const int nbytes = s_recvmsg (s, &msg, flags_);
|
||||
if (unlikely (nbytes < 0)) {
|
||||
int err = errno;
|
||||
const int err = errno;
|
||||
rc = zmq_msg_close (&msg);
|
||||
errno_assert (rc == 0);
|
||||
errno = err;
|
||||
@ -509,7 +509,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
|
||||
}
|
||||
|
||||
// An oversized message is silently truncated.
|
||||
size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
|
||||
const size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
|
||||
|
||||
// We explicitly allow a null buffer argument if len is zero
|
||||
if (to_copy) {
|
||||
@ -548,7 +548,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t count = *count_;
|
||||
const size_t count = *count_;
|
||||
int nread = 0;
|
||||
bool recvmore = true;
|
||||
|
||||
@ -559,9 +559,9 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
|
||||
int rc = zmq_msg_init (&msg);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
int nbytes = s_recvmsg (s, &msg, flags_);
|
||||
const int nbytes = s_recvmsg (s, &msg, flags_);
|
||||
if (unlikely (nbytes < 0)) {
|
||||
int err = errno;
|
||||
const int err = errno;
|
||||
rc = zmq_msg_close (&msg);
|
||||
errno_assert (rc == 0);
|
||||
errno = err;
|
||||
@ -949,12 +949,12 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
|
||||
// Compute the timeout for the subsequent poll.
|
||||
zmq::timeout_t timeout =
|
||||
const zmq::timeout_t timeout =
|
||||
zmq::compute_timeout (first_pass, timeout_, now, end);
|
||||
|
||||
// Wait for events.
|
||||
{
|
||||
int rc = poll (&pollfds[0], nitems_, timeout);
|
||||
const int rc = poll (&pollfds[0], nitems_, timeout);
|
||||
if (rc == -1 && errno == EINTR) {
|
||||
return -1;
|
||||
}
|
||||
@ -1263,7 +1263,7 @@ int zmq_poller_remove_fd (void *poller_, zmq::fd_t fd_)
|
||||
|
||||
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event_, long timeout_)
|
||||
{
|
||||
int rc = zmq_poller_wait_all (poller_, event_, 1, timeout_);
|
||||
const int rc = zmq_poller_wait_all (poller_, event_, 1, timeout_);
|
||||
|
||||
if (rc < 0 && event_) {
|
||||
// TODO this is not portable... zmq_poller_event_t contains pointers,
|
||||
@ -1291,7 +1291,7 @@ int zmq_poller_wait_all (void *poller_,
|
||||
return -1;
|
||||
}
|
||||
|
||||
int rc =
|
||||
const int rc =
|
||||
(static_cast<zmq::socket_poller_t *> (poller_))
|
||||
->wait (reinterpret_cast<zmq::socket_poller_t::event_t *> (events_),
|
||||
n_events_, timeout_);
|
||||
|
@ -68,14 +68,14 @@ void *zmq_stopwatch_start ()
|
||||
|
||||
unsigned long zmq_stopwatch_intermediate (void *watch_)
|
||||
{
|
||||
uint64_t end = zmq::clock_t::now_us ();
|
||||
uint64_t start = *static_cast<uint64_t *> (watch_);
|
||||
const uint64_t end = zmq::clock_t::now_us ();
|
||||
const uint64_t start = *static_cast<uint64_t *> (watch_);
|
||||
return static_cast<unsigned long> (end - start);
|
||||
}
|
||||
|
||||
unsigned long zmq_stopwatch_stop (void *watch_)
|
||||
{
|
||||
unsigned long res = zmq_stopwatch_intermediate (watch_);
|
||||
const unsigned long res = zmq_stopwatch_intermediate (watch_);
|
||||
free (watch_);
|
||||
return res;
|
||||
}
|
||||
@ -173,12 +173,12 @@ uint8_t *zmq_z85_decode (uint8_t *dest_, const char *string_)
|
||||
goto error_inval;
|
||||
}
|
||||
value *= 85;
|
||||
uint8_t index = string_[char_nbr++] - 32;
|
||||
const uint8_t index = string_[char_nbr++] - 32;
|
||||
if (index >= sizeof (decoder)) {
|
||||
// Invalid z85 encoding, character outside range
|
||||
goto error_inval;
|
||||
}
|
||||
uint32_t summand = decoder[index];
|
||||
const uint32_t summand = decoder[index];
|
||||
if (summand == 0xFF || summand > (UINT32_MAX - value)) {
|
||||
// Invalid z85 encoding, invalid character or represented value exceeds 0xffffffff
|
||||
goto error_inval;
|
||||
@ -223,7 +223,7 @@ int zmq_curve_keypair (char *z85_public_key_, char *z85_secret_key_)
|
||||
|
||||
zmq::random_open ();
|
||||
|
||||
int res = crypto_box_keypair (public_key, secret_key);
|
||||
const int res = crypto_box_keypair (public_key, secret_key);
|
||||
zmq_z85_encode (z85_public_key_, public_key, 32);
|
||||
zmq_z85_encode (z85_secret_key_, secret_key, 32);
|
||||
|
||||
|
@ -92,7 +92,7 @@ zmq::zmtp_engine_t::zmtp_engine_t (
|
||||
|
||||
zmq::zmtp_engine_t::~zmtp_engine_t ()
|
||||
{
|
||||
int rc = _routing_id_msg.close ();
|
||||
const int rc = _routing_id_msg.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
@ -410,7 +410,7 @@ bool zmq::zmtp_engine_t::handshake_v3_0 ()
|
||||
|
||||
int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_)
|
||||
{
|
||||
int rc = msg_->init_size (_options.routing_id_size);
|
||||
const int rc = msg_->init_size (_options.routing_id_size);
|
||||
errno_assert (rc == 0);
|
||||
if (_options.routing_id_size > 0)
|
||||
memcpy (msg_->data (), _options.routing_id, _options.routing_id_size);
|
||||
@ -422,7 +422,7 @@ int zmq::zmtp_engine_t::process_routing_id_msg (msg_t *msg_)
|
||||
{
|
||||
if (_options.recv_routing_id) {
|
||||
msg_->set_flags (msg_t::routing_id);
|
||||
int rc = session ()->push_msg (msg_);
|
||||
const int rc = session ()->push_msg (msg_);
|
||||
errno_assert (rc == 0);
|
||||
} else {
|
||||
int rc = msg_->close ();
|
||||
|
Loading…
x
Reference in New Issue
Block a user