From 804bce8294daf243d14ac15f426243c568c574a6 Mon Sep 17 00:00:00 2001 From: somdoron Date: Fri, 20 Nov 2015 21:21:35 +0200 Subject: [PATCH] Fix pipe terimation in router while reading message --- src/router.cpp | 28 ++++++++++++++++++++++++++-- src/router.hpp | 6 ++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/router.cpp b/src/router.cpp index 551b2774..fb5b40a6 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -39,6 +39,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), prefetched (false), identity_sent (false), + current_in (NULL), + terminate_current_in (false), more_in (false), current_out (NULL), more_out (false), @@ -295,6 +297,14 @@ int zmq::router_t::xrecv (msg_t *msg_) prefetched = false; } more_in = msg_->flags () & msg_t::more ? true : false; + + if (!more_in) { + if (terminate_current_in) { + current_in->terminate (true); + terminate_current_in = false; + } + current_in = NULL; + } return 0; } @@ -313,8 +323,17 @@ int zmq::router_t::xrecv (msg_t *msg_) zmq_assert (pipe != NULL); // If we are in the middle of reading a message, just return the next part. - if (more_in) + if (more_in) { more_in = msg_->flags () & msg_t::more ? true : false; + + if (!more_in) { + if (terminate_current_in) { + current_in->terminate (true); + terminate_current_in = false; + } + current_in = NULL; + } + } else { // We are at the beginning of a message. // Keep the message part we have in the prefetch buffer @@ -322,6 +341,7 @@ int zmq::router_t::xrecv (msg_t *msg_) rc = prefetched_msg.move (*msg_); errno_assert (rc == 0); prefetched = true; + current_in = pipe; blob_t identity = pipe->get_identity (); rc = msg_->init_size (identity.size ()); @@ -382,6 +402,7 @@ bool zmq::router_t::xhas_in () prefetched = true; identity_sent = false; + current_in = pipe; return true; } @@ -466,7 +487,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) // connection to take the identity. outpipes.erase (it); - existing_outpipe.pipe->terminate (true); + if (existing_outpipe.pipe == current_in) + terminate_current_in = true; + else + existing_outpipe.pipe->terminate (true); } } } diff --git a/src/router.hpp b/src/router.hpp index 90cc78f1..e151ab49 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -92,6 +92,12 @@ namespace zmq // Holds the prefetched message. msg_t prefetched_msg; + // The pipe we are currently reading from + zmq::pipe_t *current_in; + + // Should current_in should be terminate after all parts received? + bool terminate_current_in; + // If true, more incoming message parts are expected. bool more_in;