2009-08-27 10:54:28 +02:00
|
|
|
/*
|
2011-10-31 16:20:30 +01:00
|
|
|
Copyright (c) 2009-2011 250bpm s.r.o.
|
2011-11-01 18:06:11 +01:00
|
|
|
Copyright (c) 2007-2009 iMatix Corporation
|
2011-11-01 13:39:54 +01:00
|
|
|
Copyright (c) 2011 VMware, Inc.
|
2011-03-02 16:30:40 +01:00
|
|
|
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
2009-08-27 10:54:28 +02:00
|
|
|
|
|
|
|
This file is part of 0MQ.
|
|
|
|
|
|
|
|
0MQ is free software; you can redistribute it and/or modify it under
|
2010-10-30 15:08:28 +02:00
|
|
|
the terms of the GNU Lesser General Public License as published by
|
2009-08-27 10:54:28 +02:00
|
|
|
the Free Software Foundation; either version 3 of the License, or
|
|
|
|
(at your option) any later version.
|
|
|
|
|
|
|
|
0MQ is distributed in the hope that it will be useful,
|
|
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
2010-10-30 15:08:28 +02:00
|
|
|
GNU Lesser General Public License for more details.
|
2009-08-27 10:54:28 +02:00
|
|
|
|
2010-10-30 15:08:28 +02:00
|
|
|
You should have received a copy of the GNU Lesser General Public License
|
2009-08-27 10:54:28 +02:00
|
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
2010-08-06 17:49:37 +02:00
|
|
|
#include <new>
|
2011-05-22 17:26:53 +02:00
|
|
|
#include <stddef.h>
|
2010-08-06 17:49:37 +02:00
|
|
|
|
2009-08-27 10:54:28 +02:00
|
|
|
#include "pipe.hpp"
|
2011-05-22 17:26:53 +02:00
|
|
|
#include "err.hpp"
|
2009-08-27 10:54:28 +02:00
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
|
|
|
|
int hwms_ [2], bool delays_ [2])
|
|
|
|
{
|
|
|
|
// Creates two pipe objects. These objects are connected by two ypipes,
|
|
|
|
// each to pass messages in one direction.
|
|
|
|
|
|
|
|
pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t ();
|
|
|
|
alloc_assert (upipe1);
|
|
|
|
pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t ();
|
|
|
|
alloc_assert (upipe2);
|
|
|
|
|
|
|
|
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
|
|
|
|
hwms_ [1], hwms_ [0], delays_ [0]);
|
|
|
|
alloc_assert (pipes_ [0]);
|
|
|
|
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
|
|
|
|
hwms_ [0], hwms_ [1], delays_ [1]);
|
|
|
|
alloc_assert (pipes_ [1]);
|
|
|
|
|
|
|
|
pipes_ [0]->set_peer (pipes_ [1]);
|
|
|
|
pipes_ [1]->set_peer (pipes_ [0]);
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
|
|
|
int inhwm_, int outhwm_, bool delay_) :
|
2009-08-27 10:54:28 +02:00
|
|
|
object_t (parent_),
|
2011-05-22 17:26:53 +02:00
|
|
|
inpipe (inpipe_),
|
|
|
|
outpipe (outpipe_),
|
|
|
|
in_active (true),
|
|
|
|
out_active (true),
|
|
|
|
hwm (outhwm_),
|
|
|
|
lwm (compute_lwm (inhwm_)),
|
2010-03-01 10:13:26 +01:00
|
|
|
msgs_read (0),
|
2011-05-22 17:26:53 +02:00
|
|
|
msgs_written (0),
|
|
|
|
peers_msgs_read (0),
|
|
|
|
peer (NULL),
|
2010-08-06 17:49:37 +02:00
|
|
|
sink (NULL),
|
2011-05-25 10:25:51 +02:00
|
|
|
state (active),
|
2011-11-04 08:00:47 +01:00
|
|
|
delay (delay_)
|
2010-08-06 17:49:37 +02:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
zmq::pipe_t::~pipe_t ()
|
2010-08-06 17:49:37 +02:00
|
|
|
{
|
|
|
|
}
|
2009-08-27 10:54:28 +02:00
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
void zmq::pipe_t::set_peer (pipe_t *peer_)
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2011-05-22 17:26:53 +02:00
|
|
|
// Peer can be set once only.
|
|
|
|
zmq_assert (!peer);
|
|
|
|
peer = peer_;
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
|
2009-12-01 14:58:00 +01:00
|
|
|
{
|
2011-05-22 17:26:53 +02:00
|
|
|
// Sink can be set once only.
|
2010-08-06 17:49:37 +02:00
|
|
|
zmq_assert (!sink);
|
|
|
|
sink = sink_;
|
2009-12-01 14:58:00 +01:00
|
|
|
}
|
|
|
|
|
2011-11-04 08:00:47 +01:00
|
|
|
void zmq::pipe_t::set_identity (const blob_t &identity_)
|
2011-06-22 16:51:40 +02:00
|
|
|
{
|
2011-11-04 08:00:47 +01:00
|
|
|
identity = identity_;
|
2011-06-22 16:51:40 +02:00
|
|
|
}
|
|
|
|
|
2011-11-04 08:00:47 +01:00
|
|
|
zmq::blob_t zmq::pipe_t::get_identity ()
|
2011-06-22 16:51:40 +02:00
|
|
|
{
|
2011-11-04 08:00:47 +01:00
|
|
|
return identity;
|
2011-06-22 16:51:40 +02:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
bool zmq::pipe_t::check_read ()
|
2009-09-30 10:08:35 +02:00
|
|
|
{
|
2011-05-25 10:25:51 +02:00
|
|
|
if (unlikely (!in_active || (state != active && state != pending)))
|
2010-08-06 17:49:37 +02:00
|
|
|
return false;
|
|
|
|
|
2009-09-30 10:08:35 +02:00
|
|
|
// Check if there's an item in the pipe.
|
2011-05-22 17:26:53 +02:00
|
|
|
if (!inpipe->check_read ()) {
|
|
|
|
in_active = false;
|
2010-07-14 18:31:17 +02:00
|
|
|
return false;
|
2010-08-28 13:06:58 +02:00
|
|
|
}
|
2010-07-14 18:31:17 +02:00
|
|
|
|
|
|
|
// If the next item in the pipe is message delimiter,
|
2011-05-22 17:26:53 +02:00
|
|
|
// initiate termination process.
|
|
|
|
if (inpipe->probe (is_delimiter)) {
|
2011-04-21 22:27:48 +02:00
|
|
|
msg_t msg;
|
2011-05-22 17:26:53 +02:00
|
|
|
bool ok = inpipe->read (&msg);
|
2010-10-08 17:23:21 +02:00
|
|
|
zmq_assert (ok);
|
2011-05-25 10:25:51 +02:00
|
|
|
delimit ();
|
2010-07-14 18:31:17 +02:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
2009-09-30 10:08:35 +02:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
bool zmq::pipe_t::read (msg_t *msg_)
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2011-05-25 10:25:51 +02:00
|
|
|
if (unlikely (!in_active || (state != active && state != pending)))
|
2010-08-06 17:49:37 +02:00
|
|
|
return false;
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
if (!inpipe->read (msg_)) {
|
|
|
|
in_active = false;
|
2009-08-28 16:51:46 +02:00
|
|
|
return false;
|
2010-08-28 13:06:58 +02:00
|
|
|
}
|
2009-08-28 16:51:46 +02:00
|
|
|
|
|
|
|
// If delimiter was read, start termination process of the pipe.
|
2011-04-21 22:27:48 +02:00
|
|
|
if (msg_->is_delimiter ()) {
|
2011-05-25 10:25:51 +02:00
|
|
|
delimit ();
|
2009-08-28 16:51:46 +02:00
|
|
|
return false;
|
|
|
|
}
|
2009-08-27 10:54:28 +02:00
|
|
|
|
2011-11-01 13:39:54 +01:00
|
|
|
if (!(msg_->flags () & msg_t::more))
|
2010-03-27 09:24:38 +01:00
|
|
|
msgs_read++;
|
|
|
|
|
2010-03-01 10:13:26 +01:00
|
|
|
if (lwm > 0 && msgs_read % lwm == 0)
|
2011-05-22 17:26:53 +02:00
|
|
|
send_activate_write (peer, msgs_read);
|
2009-08-28 16:51:46 +02:00
|
|
|
|
|
|
|
return true;
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2012-03-28 06:38:25 +02:00
|
|
|
bool zmq::pipe_t::check_write ()
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2011-05-25 10:25:51 +02:00
|
|
|
if (unlikely (!out_active || state != active))
|
2011-05-22 17:26:53 +02:00
|
|
|
return false;
|
2010-08-06 17:49:37 +02:00
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
|
2009-08-27 10:54:28 +02:00
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
if (unlikely (full)) {
|
|
|
|
out_active = false;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
bool zmq::pipe_t::write (msg_t *msg_)
|
2009-08-28 16:51:46 +02:00
|
|
|
{
|
2012-03-28 06:38:25 +02:00
|
|
|
if (unlikely (!check_write ()))
|
2011-05-22 17:26:53 +02:00
|
|
|
return false;
|
2010-08-06 17:49:37 +02:00
|
|
|
|
2011-11-01 13:39:54 +01:00
|
|
|
bool more = msg_->flags () & msg_t::more ? true : false;
|
2011-06-20 11:33:54 +02:00
|
|
|
outpipe->write (*msg_, more);
|
|
|
|
if (!more)
|
2011-05-22 17:26:53 +02:00
|
|
|
msgs_written++;
|
2010-08-06 17:49:37 +02:00
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
return true;
|
2009-08-28 16:51:46 +02:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
void zmq::pipe_t::rollback ()
|
2009-08-28 16:51:46 +02:00
|
|
|
{
|
2011-05-22 17:26:53 +02:00
|
|
|
// Remove incomplete message from the outbound pipe.
|
|
|
|
msg_t msg;
|
2011-06-19 11:17:20 +02:00
|
|
|
if (outpipe) {
|
|
|
|
while (outpipe->unwrite (&msg)) {
|
2011-11-01 13:39:54 +01:00
|
|
|
zmq_assert (msg.flags () & msg_t::more);
|
2011-06-19 11:17:20 +02:00
|
|
|
int rc = msg.close ();
|
|
|
|
errno_assert (rc == 0);
|
|
|
|
}
|
2011-05-22 17:26:53 +02:00
|
|
|
}
|
2009-08-28 16:51:46 +02:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
void zmq::pipe_t::flush ()
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2011-05-25 10:25:51 +02:00
|
|
|
// If terminate() was already called do nothing.
|
|
|
|
if (state == terminated && state == double_terminated)
|
|
|
|
return;
|
|
|
|
|
|
|
|
// The peer does not exist anymore at this point.
|
|
|
|
if (state == terminating)
|
|
|
|
return;
|
|
|
|
|
2011-06-20 08:21:00 +02:00
|
|
|
if (outpipe && !outpipe->flush ())
|
2011-05-22 17:26:53 +02:00
|
|
|
send_activate_read (peer);
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
void zmq::pipe_t::process_activate_read ()
|
2009-12-01 14:58:00 +01:00
|
|
|
{
|
2011-05-25 10:25:51 +02:00
|
|
|
if (!in_active && (state == active || state == pending)) {
|
2011-05-22 17:26:53 +02:00
|
|
|
in_active = true;
|
|
|
|
sink->read_activated (this);
|
|
|
|
}
|
2009-12-01 14:58:00 +01:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2011-05-22 17:26:53 +02:00
|
|
|
// Remember the peers's message sequence number.
|
|
|
|
peers_msgs_read = msgs_read_;
|
2010-12-15 20:10:27 +01:00
|
|
|
|
2011-05-25 10:25:51 +02:00
|
|
|
if (!out_active && state == active) {
|
2011-05-22 17:26:53 +02:00
|
|
|
out_active = true;
|
|
|
|
sink->write_activated (this);
|
2010-03-01 10:13:26 +01:00
|
|
|
}
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2011-05-30 10:07:34 +02:00
|
|
|
void zmq::pipe_t::process_hiccup (void *pipe_)
|
|
|
|
{
|
|
|
|
// Destroy old outpipe. Note that the read end of the pipe was already
|
|
|
|
// migrated to this thread.
|
|
|
|
zmq_assert (outpipe);
|
|
|
|
outpipe->flush ();
|
|
|
|
msg_t msg;
|
|
|
|
while (outpipe->read (&msg)) {
|
|
|
|
int rc = msg.close ();
|
|
|
|
errno_assert (rc == 0);
|
|
|
|
}
|
|
|
|
delete outpipe;
|
|
|
|
|
|
|
|
// Plug in the new outpipe.
|
|
|
|
zmq_assert (pipe_);
|
|
|
|
outpipe = (upipe_t*) pipe_;
|
|
|
|
out_active = true;
|
|
|
|
|
|
|
|
// If appropriate, notify the user about the hiccup.
|
|
|
|
if (state == active)
|
|
|
|
sink->hiccuped (this);
|
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
void zmq::pipe_t::process_pipe_term ()
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2011-05-25 10:25:51 +02:00
|
|
|
// This is the simple case of peer-induced termination. If there are no
|
|
|
|
// more pending messages to read, or if the pipe was configured to drop
|
|
|
|
// pending messages, we can move directly to the terminating state.
|
|
|
|
// Otherwise we'll hang up in pending state till all the pending messages
|
|
|
|
// are sent.
|
|
|
|
if (state == active) {
|
|
|
|
if (!delay) {
|
|
|
|
state = terminating;
|
2011-06-19 11:17:20 +02:00
|
|
|
outpipe = NULL;
|
2011-05-25 10:25:51 +02:00
|
|
|
send_pipe_term_ack (peer);
|
2011-05-26 11:30:25 +02:00
|
|
|
return;
|
2011-05-25 10:25:51 +02:00
|
|
|
}
|
|
|
|
else {
|
|
|
|
state = pending;
|
2011-05-26 11:30:25 +02:00
|
|
|
return;
|
2011-05-25 10:25:51 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delimiter happened to arrive before the term command. Now we have the
|
|
|
|
// term command as well, so we can move straight to terminating state.
|
|
|
|
if (state == delimited) {
|
|
|
|
state = terminating;
|
2011-06-19 11:17:20 +02:00
|
|
|
outpipe = NULL;
|
2011-05-25 10:25:51 +02:00
|
|
|
send_pipe_term_ack (peer);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// This is the case where both ends of the pipe are closed in parallel.
|
|
|
|
// We simply reply to the request by ack and continue waiting for our
|
|
|
|
// own ack.
|
|
|
|
if (state == terminated) {
|
|
|
|
state = double_terminated;
|
2011-06-19 11:17:20 +02:00
|
|
|
outpipe = NULL;
|
2011-05-22 17:26:53 +02:00
|
|
|
send_pipe_term_ack (peer);
|
2011-05-25 10:25:51 +02:00
|
|
|
return;
|
2011-05-22 17:26:53 +02:00
|
|
|
}
|
2011-05-25 10:25:51 +02:00
|
|
|
|
|
|
|
// pipe_term is invalid in other states.
|
|
|
|
zmq_assert (false);
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
void zmq::pipe_t::process_pipe_term_ack ()
|
2010-03-09 08:43:20 +01:00
|
|
|
{
|
2011-05-22 17:26:53 +02:00
|
|
|
// Notify the user that all the references to the pipe should be dropped.
|
|
|
|
zmq_assert (sink);
|
|
|
|
sink->terminated (this);
|
|
|
|
|
2011-05-25 10:25:51 +02:00
|
|
|
// In terminating and double_terminated states there's nothing to do.
|
|
|
|
// Simply deallocate the pipe. In terminated state we have to ack the
|
|
|
|
// peer before deallocating this side of the pipe. All the other states
|
|
|
|
// are invalid.
|
|
|
|
if (state == terminating) ;
|
|
|
|
else if (state == double_terminated);
|
2011-06-19 11:17:20 +02:00
|
|
|
else if (state == terminated) {
|
|
|
|
outpipe = NULL;
|
2011-05-22 17:26:53 +02:00
|
|
|
send_pipe_term_ack (peer);
|
2011-06-19 11:17:20 +02:00
|
|
|
}
|
2011-05-25 10:25:51 +02:00
|
|
|
else
|
|
|
|
zmq_assert (false);
|
2011-05-22 17:26:53 +02:00
|
|
|
|
|
|
|
// We'll deallocate the inbound pipe, the peer will deallocate the outbound
|
|
|
|
// pipe (which is an inbound pipe from its point of view).
|
|
|
|
// First, delete all the unread messages in the pipe. We have to do it by
|
|
|
|
// hand because msg_t doesn't have automatic destructor. Then deallocate
|
|
|
|
// the ypipe itself.
|
2011-04-21 22:27:48 +02:00
|
|
|
msg_t msg;
|
2011-05-22 17:26:53 +02:00
|
|
|
while (inpipe->read (&msg)) {
|
|
|
|
int rc = msg.close ();
|
|
|
|
errno_assert (rc == 0);
|
2010-03-01 10:13:26 +01:00
|
|
|
}
|
2011-05-22 17:26:53 +02:00
|
|
|
delete inpipe;
|
2010-03-09 08:43:20 +01:00
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
// Deallocate the pipe object
|
|
|
|
delete this;
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2011-05-31 14:36:51 +02:00
|
|
|
void zmq::pipe_t::terminate (bool delay_)
|
2009-08-28 16:51:46 +02:00
|
|
|
{
|
2011-05-31 14:36:51 +02:00
|
|
|
// Overload the value specified at pipe creation.
|
|
|
|
delay = delay_;
|
|
|
|
|
2011-05-25 10:25:51 +02:00
|
|
|
// If terminate was already called, we can ignore the duplicit invocation.
|
|
|
|
if (state == terminated || state == double_terminated)
|
2010-08-06 17:49:37 +02:00
|
|
|
return;
|
2011-05-25 10:25:51 +02:00
|
|
|
|
|
|
|
// If the pipe is in the final phase of async termination, it's going to
|
|
|
|
// closed anyway. No need to do anything special here.
|
|
|
|
else if (state == terminating)
|
|
|
|
return;
|
|
|
|
|
|
|
|
// The simple sync termination case. Ask the peer to terminate and wait
|
|
|
|
// for the ack.
|
|
|
|
else if (state == active) {
|
|
|
|
send_pipe_term (peer);
|
|
|
|
state = terminated;
|
|
|
|
}
|
|
|
|
|
|
|
|
// There are still pending messages available, but the user calls
|
|
|
|
// 'terminate'. We can act as if all the pending messages were read.
|
2011-05-31 14:36:51 +02:00
|
|
|
else if (state == pending && !delay) {
|
2011-06-19 11:17:20 +02:00
|
|
|
outpipe = NULL;
|
2011-05-31 14:36:51 +02:00
|
|
|
send_pipe_term_ack (peer);
|
|
|
|
state = terminating;
|
|
|
|
}
|
|
|
|
|
|
|
|
// If there are pending messages still availabe, do nothing.
|
|
|
|
else if (state == pending && delay) {
|
2011-05-25 10:25:51 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// We've already got delimiter, but not term command yet. We can ignore
|
|
|
|
// the delimiter and ack synchronously terminate as if we were in
|
|
|
|
// active state.
|
|
|
|
else if (state == delimited) {
|
|
|
|
send_pipe_term (peer);
|
|
|
|
state = terminated;
|
|
|
|
}
|
|
|
|
|
|
|
|
// There are no other states.
|
|
|
|
else
|
|
|
|
zmq_assert (false);
|
2009-08-28 16:51:46 +02:00
|
|
|
|
2011-05-31 14:36:51 +02:00
|
|
|
// Stop outbound flow of messages.
|
2011-05-22 17:26:53 +02:00
|
|
|
out_active = false;
|
2010-06-21 15:06:51 +02:00
|
|
|
|
2011-06-19 11:17:20 +02:00
|
|
|
if (outpipe) {
|
2010-08-07 09:52:34 +02:00
|
|
|
|
2011-06-19 11:17:20 +02:00
|
|
|
// Rollback any unfinished outbound messages.
|
|
|
|
rollback ();
|
|
|
|
|
|
|
|
// Push delimiter into the outbound pipe. Note that watermarks are not
|
|
|
|
// checked thus the delimiter can be written even though the pipe is full.
|
|
|
|
msg_t msg;
|
|
|
|
msg.init_delimiter ();
|
|
|
|
outpipe->write (msg, false);
|
|
|
|
flush ();
|
|
|
|
}
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
bool zmq::pipe_t::is_delimiter (msg_t &msg_)
|
2009-08-27 10:54:28 +02:00
|
|
|
{
|
2011-05-22 17:26:53 +02:00
|
|
|
return msg_.is_delimiter ();
|
2009-08-27 10:54:28 +02:00
|
|
|
}
|
2010-05-25 15:03:57 +02:00
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
int zmq::pipe_t::compute_lwm (int hwm_)
|
2010-05-25 15:03:57 +02:00
|
|
|
{
|
2011-05-22 17:26:53 +02:00
|
|
|
// Compute the low water mark. Following point should be taken
|
2010-08-06 17:49:37 +02:00
|
|
|
// into consideration:
|
|
|
|
//
|
|
|
|
// 1. LWM has to be less than HWM.
|
|
|
|
// 2. LWM cannot be set to very low value (such as zero) as after filling
|
|
|
|
// the queue it would start to refill only after all the messages are
|
|
|
|
// read from it and thus unnecessarily hold the progress back.
|
|
|
|
// 3. LWM cannot be set to very high value (such as HWM-1) as it would
|
|
|
|
// result in lock-step filling of the queue - if a single message is
|
|
|
|
// read from a full queue, writer thread is resumed to write exactly one
|
|
|
|
// message to the queue and go back to sleep immediately. This would
|
|
|
|
// result in low performance.
|
|
|
|
//
|
|
|
|
// Given the 3. it would be good to keep HWM and LWM as far apart as
|
|
|
|
// possible to reduce the thread switching overhead to almost zero,
|
|
|
|
// say HWM-LWM should be max_wm_delta.
|
|
|
|
//
|
|
|
|
// That done, we still we have to account for the cases where
|
|
|
|
// HWM < max_wm_delta thus driving LWM to negative numbers.
|
|
|
|
// Let's make LWM 1/2 of HWM in such cases.
|
2011-05-22 17:26:53 +02:00
|
|
|
int result = (hwm_ > max_wm_delta * 2) ?
|
2010-08-06 17:49:37 +02:00
|
|
|
hwm_ - max_wm_delta : (hwm_ + 1) / 2;
|
|
|
|
|
2011-05-22 17:26:53 +02:00
|
|
|
return result;
|
2010-05-25 15:03:57 +02:00
|
|
|
}
|
2011-05-25 10:25:51 +02:00
|
|
|
|
|
|
|
void zmq::pipe_t::delimit ()
|
|
|
|
{
|
|
|
|
if (state == active) {
|
|
|
|
state = delimited;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (state == pending) {
|
2011-06-19 11:17:20 +02:00
|
|
|
outpipe = NULL;
|
2011-05-25 10:25:51 +02:00
|
|
|
send_pipe_term_ack (peer);
|
|
|
|
state = terminating;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delimiter in any other state is invalid.
|
|
|
|
zmq_assert (false);
|
|
|
|
}
|
2011-05-30 10:07:34 +02:00
|
|
|
|
|
|
|
void zmq::pipe_t::hiccup ()
|
|
|
|
{
|
|
|
|
// If termination is already under way do nothing.
|
|
|
|
if (state != active)
|
|
|
|
return;
|
|
|
|
|
|
|
|
// We'll drop the pointer to the inpipe. From now on, the peer is
|
|
|
|
// responsible for deallocating it.
|
|
|
|
inpipe = NULL;
|
|
|
|
|
|
|
|
// Create new inpipe.
|
|
|
|
inpipe = new (std::nothrow) pipe_t::upipe_t ();
|
|
|
|
alloc_assert (inpipe);
|
|
|
|
in_active = true;
|
|
|
|
|
|
|
|
// Notify the peer about the hiccup.
|
|
|
|
send_hiccup (peer, (void*) inpipe);
|
|
|
|
}
|
|
|
|
|