Merge pull request #594 from hurtonm/master

Simplify ZMQ_STREAM socket implementation
This commit is contained in:
Pieter Hintjens 2013-06-28 01:15:02 -07:00
commit fe2753da0a
2 changed files with 22 additions and 83 deletions

View File

@ -34,7 +34,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_peer_id (generate_random ()) next_peer_id (generate_random ())
{ {
options.type = ZMQ_STREAM; options.type = ZMQ_STREAM;
options.recv_identity = true;
options.raw_sock = true; options.raw_sock = true;
prefetched_id.init (); prefetched_id.init ();
@ -43,7 +42,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
zmq::stream_t::~stream_t () zmq::stream_t::~stream_t ()
{ {
zmq_assert (anonymous_pipes.empty ());;
zmq_assert (outpipes.empty ()); zmq_assert (outpipes.empty ());
prefetched_id.close (); prefetched_id.close ();
prefetched_msg.close (); prefetched_msg.close ();
@ -56,40 +54,23 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
zmq_assert (pipe_); zmq_assert (pipe_);
bool identity_ok = identify_peer (pipe_); identify_peer (pipe_);
if (identity_ok) fq.attach (pipe_);
fq.attach (pipe_);
else
anonymous_pipes.insert (pipe_);
} }
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_) void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
{ {
std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_); outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
if (it != anonymous_pipes.end ()) zmq_assert (it != outpipes.end ());
anonymous_pipes.erase (it); outpipes.erase (it);
else { fq.pipe_terminated (pipe_);
outpipes_t::iterator it = outpipes.find (pipe_->get_identity ()); if (pipe_ == current_out)
zmq_assert (it != outpipes.end ()); current_out = NULL;
outpipes.erase (it);
fq.pipe_terminated (pipe_);
if (pipe_ == current_out)
current_out = NULL;
}
} }
void zmq::stream_t::xread_activated (pipe_t *pipe_) void zmq::stream_t::xread_activated (pipe_t *pipe_)
{ {
std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_); fq.activated (pipe_);
if (it == anonymous_pipes.end ())
fq.activated (pipe_);
else {
bool identity_ok = identify_peer (pipe_);
if (identity_ok) {
anonymous_pipes.erase (it);
fq.attach (pipe_);
}
}
} }
void zmq::stream_t::xwrite_activated (pipe_t *pipe_) void zmq::stream_t::xwrite_activated (pipe_t *pipe_)
@ -147,7 +128,7 @@ int zmq::stream_t::xsend (msg_t *msg_)
return 0; return 0;
} }
// Ignore the MORE flag // Ignore the MORE flag
msg_->reset_flags (msg_t::more); msg_->reset_flags (msg_t::more);
// Check whether this is the last part of the message. // Check whether this is the last part of the message.
@ -159,7 +140,7 @@ int zmq::stream_t::xsend (msg_t *msg_)
// Close the remote connection if user has asked to do so // Close the remote connection if user has asked to do so
// by sending zero length message. // by sending zero length message.
// Pending messages in the pipe will be dropped (on receiving term- ack) // Pending messages in the pipe will be dropped (on receiving term- ack)
if (msg_->size() == 0) { if (msg_->size () == 0) {
current_out->terminate (false); current_out->terminate (false);
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
@ -206,14 +187,6 @@ int zmq::stream_t::xrecv (msg_t *msg_)
pipe_t *pipe = NULL; pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe); int rc = fq.recvpipe (msg_, &pipe);
// It's possible that we receive peer's identity. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
// TODO: handle the situation when the peer changes its identity.
while (rc == 0 && msg_->is_identity ())
rc = fq.recvpipe (msg_, &pipe);
if (rc != 0) if (rc != 0)
return -1; return -1;
@ -241,16 +214,6 @@ int zmq::stream_t::xrecv (msg_t *msg_)
return 0; return 0;
} }
int zmq::stream_t::rollback (void)
{
if (current_out) {
current_out->rollback ();
current_out = NULL;
more_out = false;
}
return 0;
}
bool zmq::stream_t::xhas_in () bool zmq::stream_t::xhas_in ()
{ {
// If we are in the middle of reading the messages, there are // If we are in the middle of reading the messages, there are
@ -266,13 +229,6 @@ bool zmq::stream_t::xhas_in ()
// The message, if read, is kept in the pre-fetch buffer. // The message, if read, is kept in the pre-fetch buffer.
pipe_t *pipe = NULL; pipe_t *pipe = NULL;
int rc = fq.recvpipe (&prefetched_msg, &pipe); int rc = fq.recvpipe (&prefetched_msg, &pipe);
// It's possible that we receive peer's identity. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
while (rc == 0 && prefetched_msg.is_identity ())
rc = fq.recvpipe (&prefetched_msg, &pipe);
if (rc != 0) if (rc != 0)
return false; return false;
@ -292,34 +248,29 @@ bool zmq::stream_t::xhas_in ()
bool zmq::stream_t::xhas_out () bool zmq::stream_t::xhas_out ()
{ {
// In theory, ROUTER socket is always ready for writing. Whether actual // In theory, STREAM socket is always ready for writing. Whether actual
// attempt to write succeeds depends on which pipe the message is going // attempt to write succeeds depends on which pipe the message is going
// to be routed to. // to be routed to.
return true; return true;
} }
bool zmq::stream_t::identify_peer (pipe_t *pipe_) void zmq::stream_t::identify_peer (pipe_t *pipe_)
{ {
blob_t identity;
bool ok;
// Always assign identity for raw-socket // Always assign identity for raw-socket
unsigned char buffer [5]; unsigned char buffer [5];
buffer [0] = 0; buffer [0] = 0;
put_uint32 (buffer + 1, next_peer_id++); put_uint32 (buffer + 1, next_peer_id++);
identity = blob_t (buffer, sizeof buffer); blob_t identity = blob_t (buffer, sizeof buffer);
unsigned int i = 0; // Store identity to allow use of raw socket as client
for (blob_t::iterator it = identity.begin(); it != identity.end(); it++) memcpy (options.identity, identity.data (), identity.size ());
options.identity[i++] = *it; options.identity_size = identity.size ();
options.identity_size = i;
pipe_->set_identity (identity); pipe_->set_identity (identity);
// Add the record into output pipes lookup table // Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true}; outpipe_t outpipe = {pipe_, true};
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second; const bool ok = outpipes.insert (
outpipes_t::value_type (identity, outpipe)).second;
zmq_assert (ok); zmq_assert (ok);
return true;
} }
zmq::stream_session_t::stream_session_t (io_thread_t *io_thread_, bool connect_, zmq::stream_session_t::stream_session_t (io_thread_t *io_thread_, bool connect_,

View File

@ -30,7 +30,7 @@ namespace zmq
class ctx_t; class ctx_t;
class pipe_t; class pipe_t;
class stream_t : class stream_t :
public socket_base_t public socket_base_t
{ {
public: public:
@ -48,14 +48,9 @@ namespace zmq
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
protected:
// Rollback any message parts that were sent but not yet flushed.
int rollback ();
private: private:
// Receive peer id and update lookup map // Generate peer's id and update lookup map
bool identify_peer (pipe_t *pipe_); void identify_peer (pipe_t *pipe_);
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t fq; fq_t fq;
@ -82,9 +77,6 @@ namespace zmq
bool active; bool active;
}; };
// We keep a set of pipes that have not been identified yet.
std::set <pipe_t*> anonymous_pipes;
// Outbound pipes indexed by the peer IDs. // Outbound pipes indexed by the peer IDs.
typedef std::map <blob_t, outpipe_t> outpipes_t; typedef std::map <blob_t, outpipe_t> outpipes_t;
outpipes_t outpipes; outpipes_t outpipes;
@ -99,10 +91,6 @@ namespace zmq
// algorithm. This value is the next ID to use (if not used already). // algorithm. This value is the next ID to use (if not used already).
uint32_t next_peer_id; uint32_t next_peer_id;
// If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer.
bool mandatory;
stream_t (const stream_t&); stream_t (const stream_t&);
const stream_t &operator = (const stream_t&); const stream_t &operator = (const stream_t&);
}; };