New wire format for REQ/REP pattern

This patch introduces two changes:
1. 32-bit ID is used to identify the peer instead of UUID
2. REQ socket seeds the label stack with unique 32-bit request ID
   It also drops any replies with non-matching request ID

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik 2011-06-22 11:02:16 +02:00
parent 10a93bb79f
commit ec81f8fb25
12 changed files with 344 additions and 70 deletions

1
.gitignore vendored
View File

@ -29,6 +29,7 @@ tests/test_reqrep_tcp
tests/test_shutdown_stress
tests/test_hwm
tests/test_timeo
tests/test_reqrep_device
src/platform.hpp*
src/stamp-h1
devices/zmq_forwarder/zmq_forwarder

View File

@ -53,6 +53,7 @@ libzmq_la_SOURCES = \
pub.hpp \
pull.hpp \
push.hpp \
random.hpp \
reaper.hpp \
rep.hpp \
req.hpp \
@ -117,6 +118,7 @@ libzmq_la_SOURCES = \
push.cpp \
reaper.cpp \
pub.cpp \
random.cpp \
rep.cpp \
req.cpp \
router.cpp \

39
src/random.cpp Normal file
View File

@ -0,0 +1,39 @@
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "random.hpp"
#include "uuid.hpp"
#include "err.hpp"
// Here we can use different ways of generating random data, as avialable
// on different platforms. At the moment, we'll assume the UUID is random
// enough to use for that purpose.
void zmq::generate_random (void *buf_, size_t size_)
{
// Collapsing an UUID into 4 bytes.
zmq_assert (size_ == 4);
uint32_t buff [4];
generate_uuid ((void*) buff);
uint32_t result = buff [0];
result ^= buff [1];
result ^= buff [2];
result ^= buff [3];
*((uint32_t*) buf_) = result;
}

34
src/random.hpp Normal file
View File

@ -0,0 +1,34 @@
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_RANDOM_HPP_INCLUDED__
#define __ZMQ_RANDOM_HPP_INCLUDED__
#include <stddef.h>
namespace zmq
{
// Generates truly random bytes (not pseudo-random).
void generate_random (void *buf_, size_t size_);
}
#endif

View File

@ -64,54 +64,32 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
return -1;
}
// First thing to do when receiving a request is to copy all the labels
// to the reply pipe.
if (request_begins) {
// Copy the backtrace stack to the reply pipe.
while (true) {
// TODO: If request can be read but reply pipe is not
// ready for writing, we should drop the reply.
// Get next part of the backtrace stack.
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
if (!(msg_->flags () & msg_t::label))
break;
if (msg_->flags () & (msg_t::more | msg_t::label)) {
// Empty message part delimits the traceback stack.
bool bottom = (msg_->size () == 0);
// Push it to the reply pipe.
rc = xrep_t::xsend (msg_, flags_);
zmq_assert (rc == 0);
// The end of the traceback, move to processing message body.
if (bottom)
break;
}
else {
// If the traceback stack is malformed, discard anything
// already sent to pipe (we're at end of invalid message)
// and continue reading -- that'll switch us to the next pipe
// and next request.
rc = xrep_t::rollback ();
zmq_assert (rc == 0);
}
// TODO: If the reply cannot be sent to the peer because
// od congestion, we should drop it.
rc = xrep_t::xsend (msg_, flags_);
zmq_assert (rc == 0);
}
request_begins = false;
}
// Now the routing info is safely stored. Get the first part
// of the message payload and exit.
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
else {
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
}
zmq_assert (!(msg_->flags () & msg_t::label));
// If whole request is read, flip the FSM to reply-sending state.
if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
if (!(msg_->flags () & msg_t::more)) {
sending_reply = true;
request_begins = true;
}

View File

@ -21,13 +21,21 @@
#include "req.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "uuid.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
receiving_reply (false),
message_begins (true)
message_begins (true),
request_id (0)
{
options.type = ZMQ_REQ;
// Start the request ID sequence at an random point.
generate_random (&request_id, sizeof (request_id));
}
zmq::req_t::~req_t ()
@ -43,12 +51,14 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
return -1;
}
// First part of the request is empty message part (stack bottom).
// First part of the request is the request identity.
if (message_begins) {
msg_t prefix;
int rc = prefix.init ();
int rc = prefix.init_size (4);
errno_assert (rc == 0);
prefix.set_flags (msg_t::label);
unsigned char *data = (unsigned char*) prefix.data ();
put_uint32 (data, request_id);
rc = xreq_t::xsend (&prefix, flags_);
if (rc != 0)
return rc;
@ -78,13 +88,28 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return -1;
}
// First part of the reply should be empty message part (stack bottom).
// First part of the reply should be the original request ID.
if (message_begins) {
int rc = xreq_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
zmq_assert (msg_->flags () & msg_t::label);
zmq_assert (msg_->size () == 0);
zmq_assert (msg_->size () == 4);
unsigned char *data = (unsigned char*) msg_->data ();
if (unlikely (get_uint32 (data) != request_id)) {
// The request ID does not match. Drop the entire message.
while (true) {
int rc = xreq_t::xrecv (msg_, flags_);
errno_assert (rc == 0);
if (!(msg_->flags () & (msg_t::label | msg_t::more)))
break;
}
msg_->close ();
msg_->init ();
errno = EAGAIN;
return -1;
}
message_begins = false;
}
@ -94,6 +119,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
// If the reply is fully received, flip the FSM into request-sending state.
if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
request_id++;
receiving_reply = false;
message_begins = true;
}
@ -103,6 +129,8 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
bool zmq::req_t::xhas_in ()
{
// TODO: Duplicates should be removed here.
if (!receiving_reply)
return false;

View File

@ -22,6 +22,7 @@
#define __ZMQ_REQ_HPP_INCLUDED__
#include "xreq.hpp"
#include "stdint.hpp"
namespace zmq
{
@ -49,6 +50,10 @@ namespace zmq
// of the message must be empty message part (backtrace stack bottom).
bool message_begins;
// Request ID. Request numbers gradually increase (and wrap over)
// so that we don't have to generate random ID for each request.
uint32_t request_id;
req_t (const req_t&);
const req_t &operator = (const req_t&);
};

View File

@ -598,15 +598,15 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
ticks = 0;
rc = xrecv (msg_, flags_);
if (rc == 0) {
rcvlabel = msg_->flags () & msg_t::label;
if (rcvlabel)
msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
msg_->reset_flags (msg_t::more);
}
return rc;
if (rc < 0)
return rc;
rcvlabel = msg_->flags () & msg_t::label;
if (rcvlabel)
msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
msg_->reset_flags (msg_t::more);
return 0;
}
// Compute the time when the timeout should occur.

View File

@ -20,6 +20,8 @@
#include "xrep.hpp"
#include "pipe.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "err.hpp"
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
@ -32,9 +34,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREP;
// On connect, pipes are created only after initial handshaking.
// That way we are aware of the peer's identity when binding to the pipes.
options.immediate_connect = false;
// Start the peer ID sequence from a random point.
generate_random (&next_peer_id, sizeof (next_peer_id));
}
zmq::xrep_t::~xrep_t ()
@ -47,16 +48,33 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
// Generate a new peer ID. Take care to avoid duplicates.
outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
if (!outpipes.empty ()) {
while (true) {
if (it == outpipes.end ())
it = outpipes.begin ();
if (it->first != next_peer_id)
break;
++next_peer_id;
++it;
}
}
// Add the pipe to the map out outbound pipes.
// TODO: What if new connection has same peer identity as the old one?
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
peer_identity_, outpipe)).second;
next_peer_id, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
inpipe_t inpipe = {pipe_, peer_identity_, true};
inpipe_t inpipe = {pipe_, next_peer_id, true};
inpipes.push_back (inpipe);
// Advance next peer ID so that if new connection is dropped shortly after
// its creation we don't accidentally get two subsequent peers with
// the same ID.
++next_peer_id;
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
@ -115,7 +133,7 @@ void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
{
// If this is the first part of the message it's the identity of the
// If this is the first part of the message it's the ID of the
// peer to send the message to.
if (!more_out) {
zmq_assert (!current_out);
@ -127,10 +145,11 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
more_out = true;
// Find the pipe associated with the identity stored in the prefix.
// Find the pipe associated with the peer ID stored in the prefix.
// If there's no such pipe just silently ignore the message.
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
zmq_assert (msg_->size () == 4);
uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
outpipes_t::iterator it = outpipes.find (peer_id);
if (it != outpipes.end ()) {
current_out = it->second.pipe;
@ -220,10 +239,10 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// If we have a message, create a prefix and return it to the caller.
if (prefetched) {
int rc = msg_->init_size (inpipes [current_in].identity.size ());
int rc = msg_->init_size (4);
errno_assert (rc == 0);
memcpy (msg_->data (), inpipes [current_in].identity.data (),
msg_->size ());
put_uint32 ((unsigned char*) msg_->data (),
inpipes [current_in].peer_id);
msg_->set_flags (msg_t::label);
return 0;
}

View File

@ -25,7 +25,7 @@
#include <vector>
#include "socket_base.hpp"
#include "blob.hpp"
#include "stdint.hpp"
#include "msg.hpp"
namespace zmq
@ -41,7 +41,8 @@ namespace zmq
~xrep_t ();
// Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
void xattach_pipe (class pipe_t *pipe_,
const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
@ -60,7 +61,7 @@ namespace zmq
struct inpipe_t
{
class pipe_t *pipe;
blob_t identity;
uint32_t peer_id;
bool active;
};
@ -86,8 +87,8 @@ namespace zmq
bool active;
};
// Outbound pipes indexed by the peer names.
typedef std::map <blob_t, outpipe_t> outpipes_t;
// Outbound pipes indexed by the peer IDs.
typedef std::map <uint32_t, outpipe_t> outpipes_t;
outpipes_t outpipes;
// The pipe we are currently writing to.
@ -96,6 +97,10 @@ namespace zmq
// If true, more outgoing message parts are expected.
bool more_out;
// Peer ID are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already).
uint32_t next_peer_id;
xrep_t (const xrep_t&);
const xrep_t &operator = (const xrep_t&);
};

View File

@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_pair_tcp \
test_reqrep_inproc \
test_reqrep_tcp \
test_hwm
test_hwm \
test_reqrep_device
if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \
@ -22,6 +23,8 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp
test_hwm_SOURCES = test_hwm.cpp
test_reqrep_device_SOURCES = test_reqrep_device.cpp
if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp

View File

@ -0,0 +1,160 @@
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <string.h>
#include "../include/zmq.h"
int main (int argc, char *argv [])
{
void *ctx = zmq_init (1);
assert (ctx);
// Create a req/rep device.
void *xreq = zmq_socket (ctx, ZMQ_XREQ);
assert (xreq);
int rc = zmq_bind (xreq, "tcp://127.0.0.1:5560");
assert (rc == 0);
void *xrep = zmq_socket (ctx, ZMQ_XREP);
assert (xrep);
rc = zmq_bind (xrep, "tcp://127.0.0.1:5561");
assert (rc == 0);
// Create a worker.
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
rc = zmq_connect (rep, "tcp://127.0.0.1:5560");
assert (rc == 0);
// Create a client.
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_connect (req, "tcp://127.0.0.1:5561");
assert (rc == 0);
// Send a request.
rc = zmq_send (req, "ABC", 3, ZMQ_SNDMORE);
assert (rc == 3);
rc = zmq_send (req, "DEF", 3, 0);
assert (rc == 3);
// Pass the request through the device.
for (int i = 0; i != 4; i++) {
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recvmsg (xrep, &msg, 0);
assert (rc >= 0);
int rcvlabel;
size_t sz = sizeof (rcvlabel);
rc = zmq_getsockopt (xrep, ZMQ_RCVLABEL, &rcvlabel, &sz);
assert (rc == 0);
int rcvmore;
rc = zmq_getsockopt (xrep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
rc = zmq_sendmsg (xreq, &msg,
(rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0));
assert (rc >= 0);
}
// Receive the request.
char buff [3];
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "ABC", 3) == 0);
int rcvlabel;
size_t sz = sizeof (rcvlabel);
rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz);
assert (rc == 0);
assert (!rcvlabel);
int rcvmore;
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "DEF", 3) == 0);
rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz);
assert (rc == 0);
assert (!rcvlabel);
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
// Send the reply.
rc = zmq_send (rep, "GHI", 3, ZMQ_SNDMORE);
assert (rc == 3);
rc = zmq_send (rep, "JKL", 3, 0);
assert (rc == 3);
// Pass the reply through the device.
for (int i = 0; i != 4; i++) {
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recvmsg (xreq, &msg, 0);
assert (rc >= 0);
int rcvlabel;
size_t sz = sizeof (rcvlabel);
rc = zmq_getsockopt (xreq, ZMQ_RCVLABEL, &rcvlabel, &sz);
assert (rc == 0);
int rcvmore;
rc = zmq_getsockopt (xreq, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
rc = zmq_sendmsg (xrep, &msg,
(rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0));
assert (rc >= 0);
}
// Receive the reply.
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "GHI", 3) == 0);
rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz);
assert (rc == 0);
assert (!rcvlabel);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "JKL", 3) == 0);
rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz);
assert (rc == 0);
assert (!rcvlabel);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
// Clean up.
rc = zmq_close (req);
assert (rc == 0);
rc = zmq_close (rep);
assert (rc == 0);
rc = zmq_close (xrep);
assert (rc == 0);
rc = zmq_close (xreq);
assert (rc == 0);
rc = zmq_term (ctx);
assert (rc == 0);
return 0 ;
}