diff --git a/src/stream.cpp b/src/stream.cpp index 9463f706..68ab18d2 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -34,7 +34,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : next_peer_id (generate_random ()) { options.type = ZMQ_STREAM; - options.recv_identity = true; options.raw_sock = true; prefetched_id.init (); @@ -43,7 +42,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::stream_t::~stream_t () { - zmq_assert (anonymous_pipes.empty ());; zmq_assert (outpipes.empty ()); prefetched_id.close (); prefetched_msg.close (); @@ -56,40 +54,23 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) zmq_assert (pipe_); - bool identity_ok = identify_peer (pipe_); - if (identity_ok) - fq.attach (pipe_); - else - anonymous_pipes.insert (pipe_); + identify_peer (pipe_); + fq.attach (pipe_); } void zmq::stream_t::xpipe_terminated (pipe_t *pipe_) { - std::set ::iterator it = anonymous_pipes.find (pipe_); - if (it != anonymous_pipes.end ()) - anonymous_pipes.erase (it); - else { - outpipes_t::iterator it = outpipes.find (pipe_->get_identity ()); - zmq_assert (it != outpipes.end ()); - outpipes.erase (it); - fq.pipe_terminated (pipe_); - if (pipe_ == current_out) - current_out = NULL; - } + outpipes_t::iterator it = outpipes.find (pipe_->get_identity ()); + zmq_assert (it != outpipes.end ()); + outpipes.erase (it); + fq.pipe_terminated (pipe_); + if (pipe_ == current_out) + current_out = NULL; } void zmq::stream_t::xread_activated (pipe_t *pipe_) { - std::set ::iterator it = anonymous_pipes.find (pipe_); - if (it == anonymous_pipes.end ()) - fq.activated (pipe_); - else { - bool identity_ok = identify_peer (pipe_); - if (identity_ok) { - anonymous_pipes.erase (it); - fq.attach (pipe_); - } - } + fq.activated (pipe_); } void zmq::stream_t::xwrite_activated (pipe_t *pipe_) @@ -147,7 +128,7 @@ int zmq::stream_t::xsend (msg_t *msg_) return 0; } - // Ignore the MORE flag + // Ignore the MORE flag msg_->reset_flags (msg_t::more); // Check whether this is the last part of the message. @@ -159,7 +140,7 @@ int zmq::stream_t::xsend (msg_t *msg_) // Close the remote connection if user has asked to do so // by sending zero length message. // Pending messages in the pipe will be dropped (on receiving term- ack) - if (msg_->size() == 0) { + if (msg_->size () == 0) { current_out->terminate (false); int rc = msg_->close (); errno_assert (rc == 0); @@ -206,14 +187,6 @@ int zmq::stream_t::xrecv (msg_t *msg_) pipe_t *pipe = NULL; int rc = fq.recvpipe (msg_, &pipe); - - // It's possible that we receive peer's identity. That happens - // after reconnection. The current implementation assumes that - // the peer always uses the same identity. - // TODO: handle the situation when the peer changes its identity. - while (rc == 0 && msg_->is_identity ()) - rc = fq.recvpipe (msg_, &pipe); - if (rc != 0) return -1; @@ -241,16 +214,6 @@ int zmq::stream_t::xrecv (msg_t *msg_) return 0; } -int zmq::stream_t::rollback (void) -{ - if (current_out) { - current_out->rollback (); - current_out = NULL; - more_out = false; - } - return 0; -} - bool zmq::stream_t::xhas_in () { // If we are in the middle of reading the messages, there are @@ -266,13 +229,6 @@ bool zmq::stream_t::xhas_in () // The message, if read, is kept in the pre-fetch buffer. pipe_t *pipe = NULL; int rc = fq.recvpipe (&prefetched_msg, &pipe); - - // It's possible that we receive peer's identity. That happens - // after reconnection. The current implementation assumes that - // the peer always uses the same identity. - while (rc == 0 && prefetched_msg.is_identity ()) - rc = fq.recvpipe (&prefetched_msg, &pipe); - if (rc != 0) return false; @@ -292,34 +248,29 @@ bool zmq::stream_t::xhas_in () bool zmq::stream_t::xhas_out () { - // In theory, ROUTER socket is always ready for writing. Whether actual + // In theory, STREAM socket is always ready for writing. Whether actual // attempt to write succeeds depends on which pipe the message is going // to be routed to. return true; } -bool zmq::stream_t::identify_peer (pipe_t *pipe_) +void zmq::stream_t::identify_peer (pipe_t *pipe_) { - blob_t identity; - bool ok; - // Always assign identity for raw-socket unsigned char buffer [5]; buffer [0] = 0; put_uint32 (buffer + 1, next_peer_id++); - identity = blob_t (buffer, sizeof buffer); - unsigned int i = 0; // Store identity to allow use of raw socket as client - for (blob_t::iterator it = identity.begin(); it != identity.end(); it++) - options.identity[i++] = *it; - options.identity_size = i; + blob_t identity = blob_t (buffer, sizeof buffer); + + memcpy (options.identity, identity.data (), identity.size ()); + options.identity_size = identity.size (); pipe_->set_identity (identity); // Add the record into output pipes lookup table outpipe_t outpipe = {pipe_, true}; - ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second; + const bool ok = outpipes.insert ( + outpipes_t::value_type (identity, outpipe)).second; zmq_assert (ok); - - return true; } zmq::stream_session_t::stream_session_t (io_thread_t *io_thread_, bool connect_, diff --git a/src/stream.hpp b/src/stream.hpp index 23989b0b..73e8ca21 100644 --- a/src/stream.hpp +++ b/src/stream.hpp @@ -30,7 +30,7 @@ namespace zmq class ctx_t; class pipe_t; - class stream_t : + class stream_t : public socket_base_t { public: @@ -48,14 +48,9 @@ namespace zmq void xwrite_activated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_); - protected: - - // Rollback any message parts that were sent but not yet flushed. - int rollback (); - private: - // Receive peer id and update lookup map - bool identify_peer (pipe_t *pipe_); + // Generate peer's id and update lookup map + void identify_peer (pipe_t *pipe_); // Fair queueing object for inbound pipes. fq_t fq; @@ -82,9 +77,6 @@ namespace zmq bool active; }; - // We keep a set of pipes that have not been identified yet. - std::set anonymous_pipes; - // Outbound pipes indexed by the peer IDs. typedef std::map outpipes_t; outpipes_t outpipes; @@ -99,10 +91,6 @@ namespace zmq // algorithm. This value is the next ID to use (if not used already). uint32_t next_peer_id; - // If true, report EAGAIN to the caller instead of silently dropping - // the message targeting an unknown peer. - bool mandatory; - stream_t (const stream_t&); const stream_t &operator = (const stream_t&); };