Problem: inconsistent naming style for private data members, conflicts with naming of local variables and member functions

Solution: apply and check _lower_case naming style for private data members
This commit is contained in:
Simon Giesecke
2018-05-27 11:10:39 +02:00
parent 06cfd0d8ad
commit e3c73d9881
143 changed files with 5783 additions and 4051 deletions

View File

@@ -99,12 +99,12 @@
bool zmq::socket_base_t::check_tag ()
{
return tag == 0xbaddecaf;
return _tag == 0xbaddecaf;
}
bool zmq::socket_base_t::is_thread_safe () const
{
return thread_safe;
return _thread_safe;
}
zmq::socket_base_t *zmq::socket_base_t::create (int type_,
@@ -178,8 +178,8 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_,
alloc_assert (s);
if (s->mailbox == NULL) {
s->destroyed = true;
if (s->_mailbox == NULL) {
s->_destroyed = true;
LIBZMQ_DELETE (s);
return NULL;
}
@@ -192,38 +192,38 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_,
int sid_,
bool thread_safe_) :
own_t (parent_, tid_),
tag (0xbaddecaf),
ctx_terminated (false),
destroyed (false),
poller (NULL),
handle (static_cast<poller_t::handle_t> (NULL)),
last_tsc (0),
ticks (0),
rcvmore (false),
monitor_socket (NULL),
monitor_events (0),
thread_safe (thread_safe_),
reaper_signaler (NULL),
sync (),
monitor_sync ()
_tag (0xbaddecaf),
_ctx_terminated (false),
_destroyed (false),
_poller (NULL),
_handle (static_cast<poller_t::handle_t> (NULL)),
_last_tsc (0),
_ticks (0),
_rcvmore (false),
_monitor_socket (NULL),
_monitor_events (0),
_thread_safe (thread_safe_),
_reaper_signaler (NULL),
_sync (),
_monitor_sync ()
{
options.socket_id = sid_;
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
if (thread_safe) {
mailbox = new (std::nothrow) mailbox_safe_t (&sync);
zmq_assert (mailbox);
if (_thread_safe) {
_mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
zmq_assert (_mailbox);
} else {
mailbox_t *m = new (std::nothrow) mailbox_t ();
zmq_assert (m);
if (m->get_fd () != retired_fd)
mailbox = m;
_mailbox = m;
else {
LIBZMQ_DELETE (m);
mailbox = NULL;
_mailbox = NULL;
}
}
}
@@ -241,21 +241,21 @@ int zmq::socket_base_t::get_peer_state (const void *routing_id_,
zmq::socket_base_t::~socket_base_t ()
{
if (mailbox)
LIBZMQ_DELETE (mailbox);
if (_mailbox)
LIBZMQ_DELETE (_mailbox);
if (reaper_signaler)
LIBZMQ_DELETE (reaper_signaler);
if (_reaper_signaler)
LIBZMQ_DELETE (_reaper_signaler);
scoped_lock_t lock (monitor_sync);
scoped_lock_t lock (_monitor_sync);
stop_monitor ();
zmq_assert (destroyed);
zmq_assert (_destroyed);
}
zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
{
return mailbox;
return _mailbox;
}
void zmq::socket_base_t::stop ()
@@ -345,7 +345,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this);
pipes.push_back (pipe_);
_pipes.push_back (pipe_);
// Let the derived socket type know about new pipe.
xattach_pipe (pipe_, subscribe_to_all_);
@@ -362,14 +362,14 @@ int zmq::socket_base_t::setsockopt (int option_,
const void *optval_,
size_t optvallen_)
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
if (!options.is_valid (option_)) {
errno = EINVAL;
return -1;
}
if (unlikely (ctx_terminated)) {
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
@@ -392,26 +392,27 @@ int zmq::socket_base_t::getsockopt (int option_,
void *optval_,
size_t *optvallen_)
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
if (unlikely (ctx_terminated)) {
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
if (option_ == ZMQ_RCVMORE) {
return do_getsockopt<int> (optval_, optvallen_, rcvmore ? 1 : 0);
return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
}
if (option_ == ZMQ_FD) {
if (thread_safe) {
if (_thread_safe) {
// thread safe socket doesn't provide file descriptor
errno = EINVAL;
return -1;
}
return do_getsockopt<fd_t> (
optval_, optvallen_, (static_cast<mailbox_t *> (mailbox))->get_fd ());
optval_, optvallen_,
(static_cast<mailbox_t *> (_mailbox))->get_fd ());
}
if (option_ == ZMQ_EVENTS) {
@@ -427,11 +428,11 @@ int zmq::socket_base_t::getsockopt (int option_,
}
if (option_ == ZMQ_LAST_ENDPOINT) {
return do_getsockopt (optval_, optvallen_, last_endpoint);
return do_getsockopt (optval_, optvallen_, _last_endpoint);
}
if (option_ == ZMQ_THREAD_SAFE) {
return do_getsockopt<int> (optval_, optvallen_, thread_safe ? 1 : 0);
return do_getsockopt<int> (optval_, optvallen_, _thread_safe ? 1 : 0);
}
return options.getsockopt (option_, optval_, optvallen_);
@@ -439,7 +440,7 @@ int zmq::socket_base_t::getsockopt (int option_,
int zmq::socket_base_t::join (const char *group_)
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
int rc = xjoin (group_);
@@ -449,7 +450,7 @@ int zmq::socket_base_t::join (const char *group_)
int zmq::socket_base_t::leave (const char *group_)
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
int rc = xleave (group_);
@@ -459,25 +460,25 @@ int zmq::socket_base_t::leave (const char *group_)
void zmq::socket_base_t::add_signaler (signaler_t *s_)
{
zmq_assert (thread_safe);
zmq_assert (_thread_safe);
scoped_lock_t sync_lock (sync);
(static_cast<mailbox_safe_t *> (mailbox))->add_signaler (s_);
scoped_lock_t sync_lock (_sync);
(static_cast<mailbox_safe_t *> (_mailbox))->add_signaler (s_);
}
void zmq::socket_base_t::remove_signaler (signaler_t *s_)
{
zmq_assert (thread_safe);
zmq_assert (_thread_safe);
scoped_lock_t sync_lock (sync);
(static_cast<mailbox_safe_t *> (mailbox))->remove_signaler (s_);
scoped_lock_t sync_lock (_sync);
(static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
}
int zmq::socket_base_t::bind (const char *addr_)
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
if (unlikely (ctx_terminated)) {
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
@@ -500,7 +501,7 @@ int zmq::socket_base_t::bind (const char *addr_)
rc = register_endpoint (addr_, endpoint);
if (rc == 0) {
connect_pending (addr_, this);
last_endpoint.assign (addr_);
_last_endpoint.assign (addr_);
options.connected = true;
}
return rc;
@@ -564,7 +565,7 @@ int zmq::socket_base_t::bind (const char *addr_)
session->attach_pipe (new_pipes[1]);
// Save last endpoint URI
paddr->to_string (last_endpoint);
paddr->to_string (_last_endpoint);
add_endpoint (addr_, (own_t *) session, newpipe);
@@ -591,9 +592,9 @@ int zmq::socket_base_t::bind (const char *addr_)
}
// Save last endpoint URI
listener->get_address (last_endpoint);
listener->get_address (_last_endpoint);
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
add_endpoint (_last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true;
return 0;
}
@@ -612,9 +613,9 @@ int zmq::socket_base_t::bind (const char *addr_)
}
// Save last endpoint URI
listener->get_address (last_endpoint);
listener->get_address (_last_endpoint);
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
add_endpoint (_last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true;
return 0;
}
@@ -632,7 +633,7 @@ int zmq::socket_base_t::bind (const char *addr_)
}
// Save last endpoint URI
listener->get_address (last_endpoint);
listener->get_address (_last_endpoint);
add_endpoint (addr_, (own_t *) listener, NULL);
options.connected = true;
@@ -651,9 +652,9 @@ int zmq::socket_base_t::bind (const char *addr_)
return -1;
}
listener->get_address (last_endpoint);
listener->get_address (_last_endpoint);
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
add_endpoint (_last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true;
return 0;
}
@@ -665,9 +666,9 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
if (unlikely (ctx_terminated)) {
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
@@ -780,10 +781,10 @@ int zmq::socket_base_t::connect (const char *addr_)
attach_pipe (new_pipes[0]);
// Save last endpoint URI
last_endpoint.assign (addr_);
_last_endpoint.assign (addr_);
// remember inproc connections for disconnect
inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), new_pipes[0]);
_inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), new_pipes[0]);
options.connected = true;
return 0;
@@ -792,8 +793,8 @@ int zmq::socket_base_t::connect (const char *addr_)
(options.type == ZMQ_DEALER || options.type == ZMQ_SUB
|| options.type == ZMQ_PUB || options.type == ZMQ_REQ);
if (unlikely (is_single_connect)) {
const endpoints_t::iterator it = endpoints.find (addr_);
if (it != endpoints.end ()) {
const endpoints_t::iterator it = _endpoints.find (addr_);
if (it != _endpoints.end ()) {
// There is no valid use for multiple connects for SUB-PUB nor
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
// nonsensical results.
@@ -970,7 +971,7 @@ int zmq::socket_base_t::connect (const char *addr_)
}
// Save last endpoint URI
paddr->to_string (last_endpoint);
paddr->to_string (_last_endpoint);
add_endpoint (addr_, (own_t *) session, newpipe);
return 0;
@@ -982,16 +983,16 @@ void zmq::socket_base_t::add_endpoint (const char *addr_,
{
// Activate the session. Make it a child of this socket.
launch_child (endpoint_);
endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_),
endpoint_pipe_t (endpoint_, pipe_));
_endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_),
endpoint_pipe_t (endpoint_, pipe_));
}
int zmq::socket_base_t::term_endpoint (const char *addr_)
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
@@ -1024,7 +1025,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
return 0;
}
std::pair<inprocs_t::iterator, inprocs_t::iterator> range =
inprocs.equal_range (addr_str);
_inprocs.equal_range (addr_str);
if (range.first == range.second) {
errno = ENOENT;
return -1;
@@ -1032,7 +1033,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
for (inprocs_t::iterator it = range.first; it != range.second; ++it)
it->second->terminate (true);
inprocs.erase (range.first, range.second);
_inprocs.erase (range.first, range.second);
return 0;
}
@@ -1044,14 +1045,14 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
// resolve before giving up. Given at this stage we don't know whether a
// socket is connected or bound, try with both.
if (protocol == "tcp") {
if (endpoints.find (resolved_addr) == endpoints.end ()) {
if (_endpoints.find (resolved_addr) == _endpoints.end ()) {
tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
alloc_assert (tcp_addr);
rc = tcp_addr->resolve (address.c_str (), false, options.ipv6);
if (rc == 0) {
tcp_addr->to_string (resolved_addr);
if (endpoints.find (resolved_addr) == endpoints.end ()) {
if (_endpoints.find (resolved_addr) == _endpoints.end ()) {
rc =
tcp_addr->resolve (address.c_str (), true, options.ipv6);
if (rc == 0) {
@@ -1065,7 +1066,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
// Find the endpoints range (if any) corresponding to the addr_ string.
const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
endpoints.equal_range (resolved_addr);
_endpoints.equal_range (resolved_addr);
if (range.first == range.second) {
errno = ENOENT;
return -1;
@@ -1077,16 +1078,16 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
it->second.second->terminate (false);
term_child (it->second.first);
}
endpoints.erase (range.first, range.second);
_endpoints.erase (range.first, range.second);
return 0;
}
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
@@ -1130,7 +1131,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
// Compute the time when the timeout should occur.
// If the timeout is infinite, don't care.
int timeout = options.sndtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
@@ -1146,7 +1147,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
return -1;
}
if (timeout > 0) {
timeout = static_cast<int> (end - clock.now_ms ());
timeout = static_cast<int> (end - _clock.now_ms ());
if (timeout <= 0) {
errno = EAGAIN;
return -1;
@@ -1159,10 +1160,10 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
@@ -1181,11 +1182,11 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// Note that 'recv' uses different command throttling algorithm (the one
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing RDTSC all the time.
if (++ticks == inbound_poll_rate) {
if (++_ticks == inbound_poll_rate) {
if (unlikely (process_commands (0, false) != 0)) {
return -1;
}
ticks = 0;
_ticks = 0;
}
// Get the message.
@@ -1208,7 +1209,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
if (unlikely (process_commands (0, false) != 0)) {
return -1;
}
ticks = 0;
_ticks = 0;
rc = xrecv (msg_);
if (rc < 0) {
@@ -1222,18 +1223,18 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// Compute the time when the timeout should occur.
// If the timeout is infinite, don't care.
int timeout = options.rcvtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
// In blocking scenario, commands are processed over and over again until
// we are able to fetch a message.
bool block = (ticks != 0);
bool block = (_ticks != 0);
while (true) {
if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
return -1;
}
rc = xrecv (msg_);
if (rc == 0) {
ticks = 0;
_ticks = 0;
break;
}
if (unlikely (errno != EAGAIN)) {
@@ -1241,7 +1242,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
}
block = true;
if (timeout > 0) {
timeout = static_cast<int> (end - clock.now_ms ());
timeout = static_cast<int> (end - _clock.now_ms ());
if (timeout <= 0) {
errno = EAGAIN;
return -1;
@@ -1255,14 +1256,14 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
// Remove all existing signalers for thread safe sockets
if (thread_safe)
(static_cast<mailbox_safe_t *> (mailbox))->clear_signalers ();
if (_thread_safe)
(static_cast<mailbox_safe_t *> (_mailbox))->clear_signalers ();
// Mark the socket as dead
tag = 0xdeadbeef;
_tag = 0xdeadbeef;
// Transfer the ownership of the socket from this application thread
@@ -1286,29 +1287,29 @@ bool zmq::socket_base_t::has_out ()
void zmq::socket_base_t::start_reaping (poller_t *poller_)
{
// Plug the socket to the reaper thread.
poller = poller_;
_poller = poller_;
fd_t fd;
if (!thread_safe)
fd = (static_cast<mailbox_t *> (mailbox))->get_fd ();
if (!_thread_safe)
fd = (static_cast<mailbox_t *> (_mailbox))->get_fd ();
else {
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
reaper_signaler = new (std::nothrow) signaler_t ();
zmq_assert (reaper_signaler);
_reaper_signaler = new (std::nothrow) signaler_t ();
zmq_assert (_reaper_signaler);
// Add signaler to the safe mailbox
fd = reaper_signaler->get_fd ();
(static_cast<mailbox_safe_t *> (mailbox))
->add_signaler (reaper_signaler);
fd = _reaper_signaler->get_fd ();
(static_cast<mailbox_safe_t *> (_mailbox))
->add_signaler (_reaper_signaler);
// Send a signal to make sure reaper handle existing commands
reaper_signaler->send ();
_reaper_signaler->send ();
}
handle = poller->add_fd (fd, this);
poller->set_pollin (handle);
_handle = _poller->add_fd (fd, this);
_poller->set_pollin (_handle);
// Initialise the termination and check whether it can be deallocated
// immediately.
@@ -1322,7 +1323,7 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
command_t cmd;
if (timeout_ != 0) {
// If we are asked to wait, simply ask mailbox to wait.
rc = mailbox->recv (&cmd, timeout_);
rc = _mailbox->recv (&cmd, timeout_);
} else {
// If we are asked not to wait, check whether we haven't processed
// commands recently, so that we can throttle the new commands.
@@ -1340,19 +1341,19 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
// Check whether TSC haven't jumped backwards (in case of migration
// between CPU cores) and whether certain time have elapsed since
// last command processing. If it didn't do nothing.
if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
if (tsc >= _last_tsc && tsc - _last_tsc <= max_command_delay)
return 0;
last_tsc = tsc;
_last_tsc = tsc;
}
// Check whether there are any commands pending for this thread.
rc = mailbox->recv (&cmd, 0);
rc = _mailbox->recv (&cmd, 0);
}
// Process all available commands.
while (rc == 0) {
cmd.destination->process_command (cmd);
rc = mailbox->recv (&cmd, 0);
rc = _mailbox->recv (&cmd, 0);
}
if (errno == EINTR)
@@ -1360,7 +1361,7 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
zmq_assert (errno == EAGAIN);
if (ctx_terminated) {
if (_ctx_terminated) {
errno = ETERM;
return -1;
}
@@ -1374,10 +1375,10 @@ void zmq::socket_base_t::process_stop ()
// We'll remember the fact so that any blocking call is interrupted and any
// further attempt to use the socket will return ETERM. The user is still
// responsible for calling zmq_close on the socket though!
scoped_lock_t lock (monitor_sync);
scoped_lock_t lock (_monitor_sync);
stop_monitor ();
ctx_terminated = true;
_ctx_terminated = true;
}
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
@@ -1393,9 +1394,9 @@ void zmq::socket_base_t::process_term (int linger_)
unregister_endpoints (this);
// Ask all attached pipes to terminate.
for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
pipes[i]->terminate (false);
register_term_acks (static_cast<int> (pipes.size ()));
for (pipes_t::size_type i = 0; i != _pipes.size (); ++i)
_pipes[i]->terminate (false);
register_term_acks (static_cast<int> (_pipes.size ()));
// Continue the termination process immediately.
own_t::process_term (linger_);
@@ -1410,16 +1411,16 @@ void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
void zmq::socket_base_t::update_pipe_options (int option_)
{
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
for (pipes_t::size_type i = 0; i != pipes.size (); ++i) {
pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
_pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
_pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
}
}
}
void zmq::socket_base_t::process_destroy ()
{
destroyed = true;
_destroyed = true;
}
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
@@ -1492,11 +1493,11 @@ void zmq::socket_base_t::in_event ()
// that may be available at the moment. Ultimately, the socket will
// be destroyed.
{
scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
// If the socket is thread safe we need to unsignal the reaper signaler
if (thread_safe)
reaper_signaler->recv ();
if (_thread_safe)
_reaper_signaler->recv ();
process_commands (0, false);
}
@@ -1516,9 +1517,9 @@ void zmq::socket_base_t::timer_event (int)
void zmq::socket_base_t::check_destroy ()
{
// If the object was already marked as destroyed, finish the deallocation.
if (destroyed) {
if (_destroyed) {
// Remove the socket from the reaper's poller.
poller->rm_fd (handle);
_poller->rm_fd (_handle);
// Remove the socket from the context.
destroy_socket (this);
@@ -1556,15 +1557,16 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
xpipe_terminated (pipe_);
// Remove pipe from inproc pipes
for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
for (inprocs_t::iterator it = _inprocs.begin (); it != _inprocs.end ();
++it)
if (it->second == pipe_) {
inprocs.erase (it);
_inprocs.erase (it);
break;
}
// Remove the pipe from the list of attached pipes and confirm its
// termination if we are already shutting down.
pipes.erase (pipe_);
_pipes.erase (pipe_);
if (is_terminating ())
unregister_term_ack ();
}
@@ -1576,14 +1578,14 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
zmq_assert (options.recv_routing_id);
// Remove MORE flag.
rcvmore = (msg_->flags () & msg_t::more) != 0;
_rcvmore = (msg_->flags () & msg_t::more) != 0;
}
int zmq::socket_base_t::monitor (const char *addr_, int events_)
{
scoped_lock_t lock (monitor_sync);
scoped_lock_t lock (_monitor_sync);
if (unlikely (ctx_terminated)) {
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
@@ -1605,24 +1607,24 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
return -1;
}
// already monitoring. Stop previous monitor before starting new one.
if (monitor_socket != NULL) {
if (_monitor_socket != NULL) {
stop_monitor (true);
}
// Register events to monitor
monitor_events = events_;
monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
if (monitor_socket == NULL)
_monitor_events = events_;
_monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
if (_monitor_socket == NULL)
return -1;
// Never block context termination on pending event messages
int linger = 0;
int rc =
zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
zmq_setsockopt (_monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
if (rc == -1)
stop_monitor (false);
// Spawn the monitor socket endpoint
rc = zmq_bind (monitor_socket, addr_);
rc = zmq_bind (_monitor_socket, addr_);
if (rc == -1)
stop_monitor (false);
return rc;
@@ -1713,8 +1715,8 @@ void zmq::socket_base_t::event (const std::string &addr_,
intptr_t value_,
int type_)
{
scoped_lock_t lock (monitor_sync);
if (monitor_events & type_) {
scoped_lock_t lock (_monitor_sync);
if (_monitor_events & type_) {
monitor_event (type_, value_, addr_);
}
}
@@ -1727,7 +1729,7 @@ void zmq::socket_base_t::monitor_event (int event_,
// this is a private method which is only called from
// contexts where the mutex has been locked before
if (monitor_socket) {
if (_monitor_socket) {
// Send event in first frame
zmq_msg_t msg;
zmq_msg_init_size (&msg, 6);
@@ -1737,12 +1739,12 @@ void zmq::socket_base_t::monitor_event (int event_,
uint32_t value = static_cast<uint32_t> (value_);
memcpy (data + 0, &event, sizeof (event));
memcpy (data + 2, &value, sizeof (value));
zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
zmq_sendmsg (_monitor_socket, &msg, ZMQ_SNDMORE);
// Send address in second frame
zmq_msg_init_size (&msg, addr_.size ());
memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
zmq_sendmsg (monitor_socket, &msg, 0);
zmq_sendmsg (_monitor_socket, &msg, 0);
}
}
@@ -1751,12 +1753,12 @@ void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
// this is a private method which is only called from
// contexts where the mutex has been locked before
if (monitor_socket) {
if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
if (_monitor_socket) {
if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
&& send_monitor_stopped_event_)
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
zmq_close (monitor_socket);
monitor_socket = NULL;
monitor_events = 0;
zmq_close (_monitor_socket);
_monitor_socket = NULL;
_monitor_events = 0;
}
}