mirror of
https://github.com/zeromq/libzmq.git
synced 2025-07-02 00:43:31 +02:00
session_base: code cleanup
- add unlikely hints - drop unnecessary assertion - style fixes There is no need to require the 'more' flag in the provided message structure be 0 when pulling message from the session.
This commit is contained in:
parent
5da971275d
commit
e51a1f04c9
@ -157,8 +157,7 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
|
|||||||
int zmq::session_base_t::pull_msg (msg_t *msg_)
|
int zmq::session_base_t::pull_msg (msg_t *msg_)
|
||||||
{
|
{
|
||||||
// First message to send is identity
|
// First message to send is identity
|
||||||
if (!identity_sent) {
|
if (unlikely (!identity_sent)) {
|
||||||
zmq_assert (!(msg_->flags () & msg_t::more));
|
|
||||||
int rc = msg_->init_size (options.identity_size);
|
int rc = msg_->init_size (options.identity_size);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (msg_->data (), options.identity, options.identity_size);
|
memcpy (msg_->data (), options.identity, options.identity_size);
|
||||||
@ -179,7 +178,7 @@ int zmq::session_base_t::pull_msg (msg_t *msg_)
|
|||||||
int zmq::session_base_t::push_msg (msg_t *msg_)
|
int zmq::session_base_t::push_msg (msg_t *msg_)
|
||||||
{
|
{
|
||||||
// First message to receive is identity
|
// First message to receive is identity
|
||||||
if (!identity_received) {
|
if (unlikely (!identity_received)) {
|
||||||
msg_->set_flags (msg_t::identity);
|
msg_->set_flags (msg_t::identity);
|
||||||
identity_received = true;
|
identity_received = true;
|
||||||
if (!options.recv_identity) {
|
if (!options.recv_identity) {
|
||||||
@ -228,10 +227,8 @@ void zmq::session_base_t::clean_pipes ()
|
|||||||
msg_t msg;
|
msg_t msg;
|
||||||
int rc = msg.init ();
|
int rc = msg.init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
if (pull_msg (&msg) != 0) {
|
rc = pull_msg (&msg);
|
||||||
zmq_assert (!incomplete_in);
|
errno_assert (rc == 0);
|
||||||
break;
|
|
||||||
}
|
|
||||||
rc = msg.close ();
|
rc = msg.close ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
}
|
}
|
||||||
@ -258,11 +255,10 @@ void zmq::session_base_t::terminated (pipe_t *pipe_)
|
|||||||
terminate ();
|
terminate ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// If we are waiting for pending messages to be sent, at this point
|
// If we are waiting for pending messages to be sent, at this point
|
||||||
// we are sure that there will be no more messages and we can proceed
|
// we are sure that there will be no more messages and we can proceed
|
||||||
// with termination safely.
|
// with termination safely.
|
||||||
if (pending && !pipe && terminating_pipes.size () == 0)
|
if (pending && !pipe && terminating_pipes.empty ())
|
||||||
proceed_with_term ();
|
proceed_with_term ();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -470,13 +466,16 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_OPENPGM
|
#ifdef ZMQ_HAVE_OPENPGM
|
||||||
|
|
||||||
// Both PGM and EPGM transports are using the same infrastructure.
|
// Both PGM and EPGM transports are using the same infrastructure.
|
||||||
if (addr->protocol == "pgm" || addr->protocol == "epgm") {
|
if (addr->protocol == "pgm" || addr->protocol == "epgm") {
|
||||||
|
|
||||||
|
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
|
||||||
|
|| options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
|
||||||
|
|
||||||
// For EPGM transport with UDP encapsulation of PGM is used.
|
// For EPGM transport with UDP encapsulation of PGM is used.
|
||||||
bool udp_encapsulation = (addr->protocol == "epgm");
|
bool const udp_encapsulation = addr->protocol == "epgm";
|
||||||
|
|
||||||
// At this point we'll create message pipes to the session straight
|
// At this point we'll create message pipes to the session straight
|
||||||
// away. There's no point in delaying it as no concept of 'connect'
|
// away. There's no point in delaying it as no concept of 'connect'
|
||||||
@ -484,7 +483,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
|
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
|
||||||
|
|
||||||
// PGM sender.
|
// PGM sender.
|
||||||
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
|
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
|
||||||
io_thread, options);
|
io_thread, options);
|
||||||
alloc_assert (pgm_sender);
|
alloc_assert (pgm_sender);
|
||||||
|
|
||||||
@ -493,11 +492,10 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
|
|
||||||
send_attach (this, pgm_sender);
|
send_attach (this, pgm_sender);
|
||||||
}
|
}
|
||||||
else
|
else {
|
||||||
if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
|
|
||||||
|
|
||||||
// PGM receiver.
|
// PGM receiver.
|
||||||
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
|
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
|
||||||
io_thread, options);
|
io_thread, options);
|
||||||
alloc_assert (pgm_receiver);
|
alloc_assert (pgm_receiver);
|
||||||
|
|
||||||
@ -506,8 +504,6 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
|
|
||||||
send_attach (this, pgm_receiver);
|
send_attach (this, pgm_receiver);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
zmq_assert (false);
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user