diff --git a/include/zmq.h b/include/zmq.h index f93b7a7c..0d1bd147 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -300,6 +300,7 @@ ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property); #define ZMQ_GSSAPI_PRINCIPAL 63 #define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64 #define ZMQ_GSSAPI_PLAINTEXT 65 +#define ZMQ_IDENTITY_FD 66 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 49896c21..e406b8fc 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ #define __ZMQ_I_ENGINE_HPP_INCLUDED__ +#include "fd.hpp" + namespace zmq { @@ -47,7 +49,10 @@ namespace zmq // are messages to send available. virtual void restart_output () = 0; - virtual void zap_msg_available () = 0; + virtual void zap_msg_available () = 0; + + // provide a way to link from engine to file descriptor + virtual fd_t get_assoc_fd () { return retired_fd;}; }; } diff --git a/src/pipe.cpp b/src/pipe.cpp index 00fea60a..6fbd91a1 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -65,6 +65,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, int inhwm_, int outhwm_, bool conflate_) : object_t (parent_), + assoc_fd (retired_fd), inpipe (inpipe_), outpipe (outpipe_), in_active (true), diff --git a/src/pipe.hpp b/src/pipe.hpp index d5a69a37..55c66ab6 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -27,6 +27,7 @@ #include "stdint.hpp" #include "array.hpp" #include "blob.hpp" +#include "fd.hpp" namespace zmq { @@ -117,6 +118,8 @@ namespace zmq // set the high water marks. void set_hwms (int inhwm_, int outhwm_); + // provide a way to link pipe to engine fd. Set on session initialization + fd_t assoc_fd; //=retired_fd private: // Type of the underlying lock-free pipe. diff --git a/src/router.cpp b/src/router.cpp index fc1035e8..ee696181 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -133,6 +133,33 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, return -1; } +int zmq::router_t::xgetsockopt (int option_, const void *optval_, + size_t *optvallen_) +{ + switch (option_) { + case ZMQ_IDENTITY_FD: + if (optval_==NULL && optvallen_) { + *optvallen_=sizeof(fd_t); + return 0; + } + if (optval_ && optvallen_ && *optvallen_) { + blob_t identity= blob_t((unsigned char*)optval_,*optvallen_); + outpipes_t::iterator it = outpipes.find (identity); + if (it == outpipes.end() ){ + return ENOTSOCK; + } + *((fd_t*)optval_)=it->second.pipe->assoc_fd; + *optvallen_=sizeof(fd_t); + return 0; + } + break; + default: + break; + } + errno = EINVAL; + return -1; +} + void zmq::router_t::xpipe_terminated (pipe_t *pipe_) { diff --git a/src/router.hpp b/src/router.hpp index f5b1ff6c..cfc63bd4 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -47,6 +47,7 @@ namespace zmq // Overrides of functions from socket_base_t. void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xgetsockopt (int option_, const void *optval_, size_t *optvallen_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); @@ -54,7 +55,6 @@ namespace zmq void xread_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_); - protected: // Rollback any message parts that were sent but not yet flushed. diff --git a/src/session_base.cpp b/src/session_base.cpp index 45682954..756d8df4 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -353,7 +353,8 @@ void zmq::session_base_t::process_attach (i_engine *engine_) // Remember the local end of the pipe. zmq_assert (!pipe); pipe = pipes [0]; - + // Store engine assoc_fd for lilnking pipe to fd + pipe->assoc_fd=engine_->get_assoc_fd(); // Ask socket to plug into the remote end of the pipe. send_bind (socket, pipes [1]); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index d467ceb8..1101b967 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -284,6 +284,11 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = ETERM; return -1; } + + // First, check whether specific socket type overloads the option. + int rc = xgetsockopt (option_, optval_, optvallen_); + if (rc == 0 || errno != EINVAL) + return rc; if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int)) { @@ -1037,6 +1042,11 @@ int zmq::socket_base_t::xsetsockopt (int, const void *, size_t) errno = EINVAL; return -1; } +int zmq::socket_base_t::xgetsockopt (int, const void *, size_t*) +{ + errno = EINVAL; + return -1; +} bool zmq::socket_base_t::xhas_out () { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index efb0cced..7038ed2b 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -133,10 +133,13 @@ namespace zmq // The default implementation assumes there are no specific socket // options for the particular socket type. If not so, override this - // method. + // methods. virtual int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + virtual int xgetsockopt (int option_, const void *optval_, + size_t *optvallen_); + // The default implementation assumes that send is not supported. virtual bool xhas_out (); virtual int xsend (zmq::msg_t *msg_); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 4e9e7915..c5c148a3 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -68,6 +68,8 @@ namespace zmq void in_event (); void out_event (); + // export s via i_engine so it is possible to link a pipe to fd + fd_t get_assoc_fd (){ return s; }; private: // Unplug the engine from the session.