mirror of
				https://github.com/zeromq/libzmq.git
				synced 2025-11-03 19:40:39 +01:00 
			
		
		
		
	Adapt to new draft header. Rebase dgram socket on pair socket
This commit is contained in:
		
							
								
								
									
										260
									
								
								src/dgram.cpp
									
									
									
									
									
								
							
							
						
						
									
										260
									
								
								src/dgram.cpp
									
									
									
									
									
								
							@@ -38,24 +38,18 @@
 | 
			
		||||
 | 
			
		||||
zmq::dgram_t::dgram_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
 | 
			
		||||
    socket_base_t (parent_, tid_, sid_),
 | 
			
		||||
    pipe (NULL),
 | 
			
		||||
    last_in (NULL),
 | 
			
		||||
    prefetched (false),
 | 
			
		||||
    identity_sent (false),
 | 
			
		||||
    current_out (NULL),
 | 
			
		||||
    more_out (false),
 | 
			
		||||
    next_rid (generate_random ())
 | 
			
		||||
    more_out (false)
 | 
			
		||||
{
 | 
			
		||||
    options.type = ZMQ_DGRAM;
 | 
			
		||||
    options.raw_socket = true;
 | 
			
		||||
 | 
			
		||||
    prefetched_id.init ();
 | 
			
		||||
    prefetched_msg.init ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
zmq::dgram_t::~dgram_t ()
 | 
			
		||||
{
 | 
			
		||||
    zmq_assert (outpipes.empty ());
 | 
			
		||||
    prefetched_id.close ();
 | 
			
		||||
    prefetched_msg.close ();
 | 
			
		||||
    zmq_assert (!pipe);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
 | 
			
		||||
@@ -64,110 +58,85 @@ void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
 | 
			
		||||
 | 
			
		||||
    zmq_assert (pipe_);
 | 
			
		||||
 | 
			
		||||
    identify_peer (pipe_);
 | 
			
		||||
    fq.attach (pipe_);
 | 
			
		||||
    //  ZMQ_DGRAM socket can only be connected to a single peer.
 | 
			
		||||
    //  The socket rejects any further connection requests.
 | 
			
		||||
    if (pipe == NULL)
 | 
			
		||||
        pipe = pipe_;
 | 
			
		||||
    else
 | 
			
		||||
        pipe_->terminate (false);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::dgram_t::xpipe_terminated (pipe_t *pipe_)
 | 
			
		||||
{
 | 
			
		||||
    outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
 | 
			
		||||
    zmq_assert (it != outpipes.end ());
 | 
			
		||||
    outpipes.erase (it);
 | 
			
		||||
    fq.pipe_terminated (pipe_);
 | 
			
		||||
    if (pipe_ == current_out)
 | 
			
		||||
        current_out = NULL;
 | 
			
		||||
    if (pipe_ == pipe) {
 | 
			
		||||
        if (last_in == pipe) {
 | 
			
		||||
            saved_credential = last_in->get_credential ();
 | 
			
		||||
            last_in = NULL;
 | 
			
		||||
        }
 | 
			
		||||
        pipe = NULL;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::dgram_t::xread_activated (pipe_t *pipe_)
 | 
			
		||||
void zmq::dgram_t::xread_activated (pipe_t *)
 | 
			
		||||
{
 | 
			
		||||
    fq.activated (pipe_);
 | 
			
		||||
    //  There's just one pipe. No lists of active and inactive pipes.
 | 
			
		||||
    //  There's nothing to do here.
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::dgram_t::xwrite_activated (pipe_t *pipe_)
 | 
			
		||||
void zmq::dgram_t::xwrite_activated (pipe_t *)
 | 
			
		||||
{
 | 
			
		||||
    outpipes_t::iterator it;
 | 
			
		||||
    for (it = outpipes.begin (); it != outpipes.end (); ++it)
 | 
			
		||||
        if (it->second.pipe == pipe_)
 | 
			
		||||
            break;
 | 
			
		||||
 | 
			
		||||
    zmq_assert (it != outpipes.end ());
 | 
			
		||||
    zmq_assert (!it->second.active);
 | 
			
		||||
    it->second.active = true;
 | 
			
		||||
    //  There's just one pipe. No lists of active and inactive pipes.
 | 
			
		||||
    //  There's nothing to do here.
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int zmq::dgram_t::xsend (msg_t *msg_)
 | 
			
		||||
{
 | 
			
		||||
    // If there's no out pipe, just drop it.
 | 
			
		||||
    if (!pipe) {
 | 
			
		||||
        int rc = msg_->close ();
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
        return -1;
 | 
			
		||||
    }
 | 
			
		||||
  
 | 
			
		||||
    //  If this is the first part of the message it's the ID of the
 | 
			
		||||
    //  peer to send the message to.
 | 
			
		||||
    if (!more_out) {
 | 
			
		||||
        zmq_assert (!current_out);
 | 
			
		||||
 | 
			
		||||
        //  If we have malformed message (prefix with no subsequent message)
 | 
			
		||||
        //  then just silently ignore it.
 | 
			
		||||
        //  TODO: The connections should be killed instead.
 | 
			
		||||
        //  If we have malformed message (prefix with no subsequent message) then ignore it.
 | 
			
		||||
        if (msg_->flags () & msg_t::more) {
 | 
			
		||||
 | 
			
		||||
            //  Find the pipe associated with the identity stored in the prefix.
 | 
			
		||||
            //  If there's no such pipe return an error
 | 
			
		||||
            blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
 | 
			
		||||
            outpipes_t::iterator it = outpipes.find (identity);
 | 
			
		||||
 | 
			
		||||
            if (it != outpipes.end ()) {
 | 
			
		||||
                current_out = it->second.pipe;
 | 
			
		||||
                if (!current_out->check_write ()) {
 | 
			
		||||
                    it->second.active = false;
 | 
			
		||||
                    current_out = NULL;
 | 
			
		||||
                    errno = EAGAIN;
 | 
			
		||||
            errno = EINVAL;
 | 
			
		||||
            return -1;
 | 
			
		||||
        }
 | 
			
		||||
            }
 | 
			
		||||
            else {
 | 
			
		||||
                errno = EHOSTUNREACH;
 | 
			
		||||
                return -1;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        //  Expect one more message frame.
 | 
			
		||||
        more_out = true;
 | 
			
		||||
 | 
			
		||||
        int rc = msg_->close ();
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
        rc = msg_->init ();
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    else {
 | 
			
		||||
        //  Ignore the MORE flag
 | 
			
		||||
        msg_->reset_flags (msg_t::more);
 | 
			
		||||
 | 
			
		||||
        //  This is the last part of the message.
 | 
			
		||||
        more_out = false;
 | 
			
		||||
 | 
			
		||||
    //  Push the message into the pipe. If there's no out pipe, just drop it.
 | 
			
		||||
    if (current_out) {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Close the remote connection if user has asked to do so
 | 
			
		||||
    // by sending zero length message.
 | 
			
		||||
    // Pending messages in the pipe will be dropped (on receiving term- ack)
 | 
			
		||||
    if (msg_->size () == 0) {
 | 
			
		||||
            current_out->terminate (false);
 | 
			
		||||
      pipe->terminate (false);
 | 
			
		||||
      int rc = msg_->close ();
 | 
			
		||||
      errno_assert (rc == 0);
 | 
			
		||||
      rc = msg_->init ();
 | 
			
		||||
      errno_assert (rc == 0);
 | 
			
		||||
            current_out = NULL;
 | 
			
		||||
      pipe = NULL;
 | 
			
		||||
      return 0;
 | 
			
		||||
    }
 | 
			
		||||
        bool ok = current_out->write (msg_);
 | 
			
		||||
        if (likely (ok))
 | 
			
		||||
            current_out->flush ();
 | 
			
		||||
        current_out = NULL;
 | 
			
		||||
    }
 | 
			
		||||
    else {
 | 
			
		||||
        int rc = msg_->close ();
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
    // Push the message into the pipe. 
 | 
			
		||||
    if (!pipe->write (msg_)) {
 | 
			
		||||
        errno = EAGAIN;
 | 
			
		||||
        return -1;
 | 
			
		||||
    }
 | 
			
		||||
      
 | 
			
		||||
    if (!(msg_->flags () & msg_t::more))
 | 
			
		||||
        pipe->flush ();
 | 
			
		||||
 | 
			
		||||
    //  Detach the message from the data buffer.
 | 
			
		||||
    int rc = msg_->init ();
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
@@ -175,78 +144,30 @@ int zmq::dgram_t::xsend (msg_t *msg_)
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int zmq::dgram_t::xsetsockopt (int option_, const void *optval_,
 | 
			
		||||
    size_t optvallen_)
 | 
			
		||||
{
 | 
			
		||||
    bool is_int = (optvallen_ == sizeof (int));
 | 
			
		||||
    int value = 0;
 | 
			
		||||
    if (is_int) memcpy(&value, optval_, sizeof (int));
 | 
			
		||||
 | 
			
		||||
    switch (option_) {
 | 
			
		||||
        case ZMQ_CONNECT_RID:
 | 
			
		||||
            if (optval_ && optvallen_) {
 | 
			
		||||
                connect_rid.assign ((char*) optval_, optvallen_);
 | 
			
		||||
                return 0;
 | 
			
		||||
            }
 | 
			
		||||
            break;
 | 
			
		||||
 | 
			
		||||
        case ZMQ_STREAM_NOTIFY:
 | 
			
		||||
            if (is_int && (value == 0 || value == 1)) {
 | 
			
		||||
                options.raw_notify = (value != 0);
 | 
			
		||||
                return 0;
 | 
			
		||||
            }
 | 
			
		||||
            break;
 | 
			
		||||
 | 
			
		||||
        default:
 | 
			
		||||
            break;
 | 
			
		||||
    }
 | 
			
		||||
    errno = EINVAL;
 | 
			
		||||
    return -1;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int zmq::dgram_t::xrecv (msg_t *msg_)
 | 
			
		||||
{
 | 
			
		||||
    if (prefetched) {
 | 
			
		||||
        if (!identity_sent) {
 | 
			
		||||
            int rc = msg_->move (prefetched_id);
 | 
			
		||||
    //  Deallocate old content of the message.
 | 
			
		||||
    int rc = msg_->close ();
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
            identity_sent = true;
 | 
			
		||||
  
 | 
			
		||||
    if (!pipe || !pipe->read (msg_)) {
 | 
			
		||||
        //  Initialise the output parameter to be a 0-byte message.
 | 
			
		||||
        rc = msg_->init ();
 | 
			
		||||
        errno_assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
        errno = EAGAIN;
 | 
			
		||||
        return -1;
 | 
			
		||||
    }
 | 
			
		||||
        else {
 | 
			
		||||
            int rc = msg_->move (prefetched_msg);
 | 
			
		||||
            errno_assert (rc == 0);
 | 
			
		||||
    last_in = pipe;
 | 
			
		||||
    
 | 
			
		||||
    if (prefetched) {
 | 
			
		||||
        msg_->reset_flags (msg_t::more);
 | 
			
		||||
        prefetched = false;
 | 
			
		||||
    }
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pipe_t *pipe = NULL;
 | 
			
		||||
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
 | 
			
		||||
    if (rc != 0)
 | 
			
		||||
        return -1;
 | 
			
		||||
 | 
			
		||||
    zmq_assert (pipe != NULL);
 | 
			
		||||
    zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
 | 
			
		||||
 | 
			
		||||
    //  We have received a frame with TCP data.
 | 
			
		||||
    //  Rather than sending this frame, we keep it in prefetched
 | 
			
		||||
    //  buffer and send a frame with peer's ID.
 | 
			
		||||
    blob_t identity = pipe->get_identity ();
 | 
			
		||||
    rc = msg_->close();
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
    rc = msg_->init_size (identity.size ());
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    // forward metadata (if any)
 | 
			
		||||
    metadata_t *metadata = prefetched_msg.metadata();
 | 
			
		||||
    if (metadata)
 | 
			
		||||
        msg_->set_metadata(metadata);
 | 
			
		||||
 | 
			
		||||
    memcpy (msg_->data (), identity.data (), identity.size ());
 | 
			
		||||
    else {
 | 
			
		||||
        msg_->set_flags (msg_t::more);
 | 
			
		||||
 | 
			
		||||
        prefetched = true;
 | 
			
		||||
    identity_sent = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
@@ -257,65 +178,24 @@ bool zmq::dgram_t::xhas_in ()
 | 
			
		||||
    if (prefetched)
 | 
			
		||||
        return true;
 | 
			
		||||
 | 
			
		||||
    //  Try to read the next message.
 | 
			
		||||
    //  The message, if read, is kept in the pre-fetch buffer.
 | 
			
		||||
    pipe_t *pipe = NULL;
 | 
			
		||||
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
 | 
			
		||||
    if (rc != 0)
 | 
			
		||||
    if (!pipe)
 | 
			
		||||
        return false;
 | 
			
		||||
 | 
			
		||||
    zmq_assert (pipe != NULL);
 | 
			
		||||
    zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
 | 
			
		||||
 | 
			
		||||
    blob_t identity = pipe->get_identity ();
 | 
			
		||||
    rc = prefetched_id.init_size (identity.size ());
 | 
			
		||||
    errno_assert (rc == 0);
 | 
			
		||||
 | 
			
		||||
    // forward metadata (if any)
 | 
			
		||||
    metadata_t *metadata = prefetched_msg.metadata();
 | 
			
		||||
    if (metadata)
 | 
			
		||||
        prefetched_id.set_metadata(metadata);
 | 
			
		||||
 | 
			
		||||
    memcpy (prefetched_id.data (), identity.data (), identity.size ());
 | 
			
		||||
    prefetched_id.set_flags (msg_t::more);
 | 
			
		||||
 | 
			
		||||
    prefetched = true;
 | 
			
		||||
    identity_sent = false;
 | 
			
		||||
 | 
			
		||||
    return true;
 | 
			
		||||
    return pipe->check_read ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool zmq::dgram_t::xhas_out ()
 | 
			
		||||
{
 | 
			
		||||
    //  In theory, STREAM socket is always ready for writing. Whether actual
 | 
			
		||||
    //  attempt to write succeeds depends on which pipe the message is going
 | 
			
		||||
    //  to be routed to.
 | 
			
		||||
    return true;
 | 
			
		||||
    //if (more_out)
 | 
			
		||||
    //    return false;
 | 
			
		||||
  
 | 
			
		||||
    if (!pipe)
 | 
			
		||||
        return false;
 | 
			
		||||
 | 
			
		||||
    return pipe->check_write ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void zmq::dgram_t::identify_peer (pipe_t *pipe_)
 | 
			
		||||
zmq::blob_t zmq::dgram_t::get_credential () const
 | 
			
		||||
{
 | 
			
		||||
    //  Always assign identity for raw-socket
 | 
			
		||||
    unsigned char buffer [5];
 | 
			
		||||
    buffer [0] = 0;
 | 
			
		||||
    blob_t identity;
 | 
			
		||||
    if (connect_rid.length ()) {
 | 
			
		||||
        identity = blob_t ((unsigned char*) connect_rid.c_str(),
 | 
			
		||||
            connect_rid.length ());
 | 
			
		||||
        connect_rid.clear ();
 | 
			
		||||
        outpipes_t::iterator it = outpipes.find (identity);
 | 
			
		||||
        zmq_assert (it == outpipes.end ());
 | 
			
		||||
    }
 | 
			
		||||
    else {
 | 
			
		||||
        put_uint32 (buffer + 1, next_rid++);
 | 
			
		||||
        identity = blob_t (buffer, sizeof buffer);
 | 
			
		||||
        memcpy (options.identity, identity.data (), identity.size ());
 | 
			
		||||
        options.identity_size = (unsigned char) identity.size ();
 | 
			
		||||
    }
 | 
			
		||||
    pipe_->set_identity (identity);
 | 
			
		||||
    //  Add the record into output pipes lookup table
 | 
			
		||||
    outpipe_t outpipe = {pipe_, true};
 | 
			
		||||
    const bool ok = outpipes.insert (
 | 
			
		||||
        outpipes_t::value_type (identity, outpipe)).second;
 | 
			
		||||
    zmq_assert (ok);
 | 
			
		||||
    return last_in? last_in->get_credential (): saved_credential;
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -30,15 +30,17 @@
 | 
			
		||||
#ifndef __ZMQ_DGRAM_HPP_INCLUDED__
 | 
			
		||||
#define __ZMQ_DGRAM_HPP_INCLUDED__
 | 
			
		||||
 | 
			
		||||
#include <map>
 | 
			
		||||
 | 
			
		||||
#include "router.hpp"
 | 
			
		||||
#include "blob.hpp"
 | 
			
		||||
#include "socket_base.hpp"
 | 
			
		||||
#include "session_base.hpp"
 | 
			
		||||
 | 
			
		||||
namespace zmq
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
    class ctx_t;
 | 
			
		||||
    class msg_t;
 | 
			
		||||
    class pipe_t;
 | 
			
		||||
    class io_thread_t;
 | 
			
		||||
 | 
			
		||||
    class dgram_t :
 | 
			
		||||
        public socket_base_t
 | 
			
		||||
@@ -54,50 +56,25 @@ namespace zmq
 | 
			
		||||
        int xrecv (zmq::msg_t *msg_);
 | 
			
		||||
        bool xhas_in ();
 | 
			
		||||
        bool xhas_out ();
 | 
			
		||||
        blob_t get_credential () const;
 | 
			
		||||
        void xread_activated (zmq::pipe_t *pipe_);
 | 
			
		||||
        void xwrite_activated (zmq::pipe_t *pipe_);
 | 
			
		||||
        void xpipe_terminated (zmq::pipe_t *pipe_);
 | 
			
		||||
        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
 | 
			
		||||
    private:
 | 
			
		||||
        //  Generate peer's id and update lookup map
 | 
			
		||||
        void identify_peer (pipe_t *pipe_);
 | 
			
		||||
 | 
			
		||||
        //  Fair queueing object for inbound pipes.
 | 
			
		||||
        fq_t fq;
 | 
			
		||||
    private:
 | 
			
		||||
 | 
			
		||||
        zmq::pipe_t *pipe;
 | 
			
		||||
 | 
			
		||||
        zmq::pipe_t *last_in;
 | 
			
		||||
 | 
			
		||||
        blob_t saved_credential;
 | 
			
		||||
        
 | 
			
		||||
        //  True iff there is a message held in the pre-fetch buffer.
 | 
			
		||||
        bool prefetched;
 | 
			
		||||
 | 
			
		||||
        //  If true, the receiver got the message part with
 | 
			
		||||
        //  the peer's identity.
 | 
			
		||||
        bool identity_sent;
 | 
			
		||||
 | 
			
		||||
        //  Holds the prefetched identity.
 | 
			
		||||
        msg_t prefetched_id;
 | 
			
		||||
 | 
			
		||||
        //  Holds the prefetched message.
 | 
			
		||||
        msg_t prefetched_msg;
 | 
			
		||||
 | 
			
		||||
        struct outpipe_t
 | 
			
		||||
        {
 | 
			
		||||
            zmq::pipe_t *pipe;
 | 
			
		||||
            bool active;
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        //  Outbound pipes indexed by the peer IDs.
 | 
			
		||||
        typedef std::map <blob_t, outpipe_t> outpipes_t;
 | 
			
		||||
        outpipes_t outpipes;
 | 
			
		||||
 | 
			
		||||
        //  The pipe we are currently writing to.
 | 
			
		||||
        zmq::pipe_t *current_out;
 | 
			
		||||
 | 
			
		||||
        //  If true, more outgoing message parts are expected.
 | 
			
		||||
        bool more_out;
 | 
			
		||||
 | 
			
		||||
        //  Routing IDs are generated. It's a simple increment and wrap-over
 | 
			
		||||
        //  algorithm. This value is the next ID to use (if not used already).
 | 
			
		||||
        uint32_t next_rid;
 | 
			
		||||
 | 
			
		||||
        dgram_t (const dgram_t&);
 | 
			
		||||
        const dgram_t &operator = (const dgram_t&);
 | 
			
		||||
    };
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user