mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-30 13:47:13 +01:00
pipe: check_read() should check for message delimiter
This commit is contained in:
27
src/pipe.cpp
27
src/pipe.cpp
@@ -44,15 +44,32 @@ void zmq::reader_t::set_pipe (pipe_t *pipe_)
|
|||||||
register_pipe (pipe);
|
register_pipe (pipe);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
|
||||||
|
{
|
||||||
|
unsigned char *offset = 0;
|
||||||
|
|
||||||
|
return msg_.content == (void*) (offset + ZMQ_DELIMITER);
|
||||||
|
}
|
||||||
|
|
||||||
bool zmq::reader_t::check_read ()
|
bool zmq::reader_t::check_read ()
|
||||||
{
|
{
|
||||||
// Check if there's an item in the pipe.
|
// Check if there's an item in the pipe.
|
||||||
if (pipe->check_read ())
|
|
||||||
return true;
|
|
||||||
|
|
||||||
// If not, deactivate the pipe.
|
// If not, deactivate the pipe.
|
||||||
endpoint->kill (this);
|
if (!pipe->check_read ()) {
|
||||||
return false;
|
endpoint->kill (this);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the next item in the pipe is message delimiter,
|
||||||
|
// initiate its termination.
|
||||||
|
if (pipe->probe (is_delimiter)) {
|
||||||
|
if (endpoint)
|
||||||
|
endpoint->detach_inpipe (this);
|
||||||
|
term ();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::reader_t::read (zmq_msg_t *msg_)
|
bool zmq::reader_t::read (zmq_msg_t *msg_)
|
||||||
|
|||||||
@@ -58,6 +58,9 @@ namespace zmq
|
|||||||
void process_revive ();
|
void process_revive ();
|
||||||
void process_pipe_term_ack ();
|
void process_pipe_term_ack ();
|
||||||
|
|
||||||
|
// Returns true if the message is delimiter; false otherwise.
|
||||||
|
static bool is_delimiter (zmq_msg_t &msg_);
|
||||||
|
|
||||||
// The underlying pipe.
|
// The underlying pipe.
|
||||||
class pipe_t *pipe;
|
class pipe_t *pipe;
|
||||||
|
|
||||||
|
|||||||
@@ -162,6 +162,17 @@ namespace zmq
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Applies the function fn to the first elemenent in the pipe
|
||||||
|
// and returns the value returned by the fn.
|
||||||
|
// The pipe mustn't be empty or the function crashes.
|
||||||
|
inline bool probe (bool (*fn)(T &))
|
||||||
|
{
|
||||||
|
bool rc = check_read ();
|
||||||
|
zmq_assert (rc);
|
||||||
|
|
||||||
|
return (*fn) (queue.front ());
|
||||||
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
// Allocation-efficient queue to store pipe items.
|
// Allocation-efficient queue to store pipe items.
|
||||||
|
|||||||
Reference in New Issue
Block a user