mirror of
https://github.com/zeromq/libzmq.git
synced 2025-02-08 06:45:58 +01:00
Merge pull request #1024 from sdrsdr/identity_fd
Implement ZMQ_IDENTITY_FD sock option for linking socket identity to FD
This commit is contained in:
commit
3f479f534e
@ -301,6 +301,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
|
|||||||
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
|
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
|
||||||
#define ZMQ_GSSAPI_PLAINTEXT 65
|
#define ZMQ_GSSAPI_PLAINTEXT 65
|
||||||
#define ZMQ_HANDSHAKE_IVL 66
|
#define ZMQ_HANDSHAKE_IVL 66
|
||||||
|
#define ZMQ_IDENTITY_FD 67
|
||||||
|
|
||||||
/* Message options */
|
/* Message options */
|
||||||
#define ZMQ_MORE 1
|
#define ZMQ_MORE 1
|
||||||
|
@ -20,6 +20,8 @@
|
|||||||
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
|
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
|
||||||
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
|
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
|
||||||
|
|
||||||
|
#include "fd.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -47,7 +49,10 @@ namespace zmq
|
|||||||
// are messages to send available.
|
// are messages to send available.
|
||||||
virtual void restart_output () = 0;
|
virtual void restart_output () = 0;
|
||||||
|
|
||||||
virtual void zap_msg_available () = 0;
|
virtual void zap_msg_available () = 0;
|
||||||
|
|
||||||
|
// provide a way to link from engine to file descriptor
|
||||||
|
virtual fd_t get_assoc_fd () { return retired_fd;};
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -65,6 +65,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
|
|||||||
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
||||||
int inhwm_, int outhwm_, bool conflate_) :
|
int inhwm_, int outhwm_, bool conflate_) :
|
||||||
object_t (parent_),
|
object_t (parent_),
|
||||||
|
assoc_fd (retired_fd),
|
||||||
inpipe (inpipe_),
|
inpipe (inpipe_),
|
||||||
outpipe (outpipe_),
|
outpipe (outpipe_),
|
||||||
in_active (true),
|
in_active (true),
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
#include "array.hpp"
|
#include "array.hpp"
|
||||||
#include "blob.hpp"
|
#include "blob.hpp"
|
||||||
|
#include "fd.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
@ -117,6 +118,8 @@ namespace zmq
|
|||||||
// set the high water marks.
|
// set the high water marks.
|
||||||
void set_hwms (int inhwm_, int outhwm_);
|
void set_hwms (int inhwm_, int outhwm_);
|
||||||
|
|
||||||
|
// provide a way to link pipe to engine fd. Set on session initialization
|
||||||
|
fd_t assoc_fd; //=retired_fd
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// Type of the underlying lock-free pipe.
|
// Type of the underlying lock-free pipe.
|
||||||
|
@ -133,6 +133,33 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::router_t::xgetsockopt (int option_, const void *optval_,
|
||||||
|
size_t *optvallen_)
|
||||||
|
{
|
||||||
|
switch (option_) {
|
||||||
|
case ZMQ_IDENTITY_FD:
|
||||||
|
if (optval_==NULL && optvallen_) {
|
||||||
|
*optvallen_=sizeof(fd_t);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (optval_ && optvallen_ && *optvallen_) {
|
||||||
|
blob_t identity= blob_t((unsigned char*)optval_,*optvallen_);
|
||||||
|
outpipes_t::iterator it = outpipes.find (identity);
|
||||||
|
if (it == outpipes.end() ){
|
||||||
|
return ENOTSOCK;
|
||||||
|
}
|
||||||
|
*((fd_t*)optval_)=it->second.pipe->assoc_fd;
|
||||||
|
*optvallen_=sizeof(fd_t);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
|
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
|
@ -47,6 +47,7 @@ namespace zmq
|
|||||||
// Overrides of functions from socket_base_t.
|
// Overrides of functions from socket_base_t.
|
||||||
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
|
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
|
||||||
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||||
|
int xgetsockopt (int option_, const void *optval_, size_t *optvallen_);
|
||||||
int xsend (zmq::msg_t *msg_);
|
int xsend (zmq::msg_t *msg_);
|
||||||
int xrecv (zmq::msg_t *msg_);
|
int xrecv (zmq::msg_t *msg_);
|
||||||
bool xhas_in ();
|
bool xhas_in ();
|
||||||
|
@ -353,7 +353,9 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
|
|||||||
// Remember the local end of the pipe.
|
// Remember the local end of the pipe.
|
||||||
zmq_assert (!pipe);
|
zmq_assert (!pipe);
|
||||||
pipe = pipes [0];
|
pipe = pipes [0];
|
||||||
|
// Store engine assoc_fd for lilnking pipe to fd
|
||||||
|
pipe->assoc_fd=engine_->get_assoc_fd();
|
||||||
|
pipes[1]->assoc_fd=pipe->assoc_fd;
|
||||||
// Ask socket to plug into the remote end of the pipe.
|
// Ask socket to plug into the remote end of the pipe.
|
||||||
send_bind (socket, pipes [1]);
|
send_bind (socket, pipes [1]);
|
||||||
}
|
}
|
||||||
|
@ -284,6 +284,11 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
|
|||||||
errno = ETERM;
|
errno = ETERM;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First, check whether specific socket type overloads the option.
|
||||||
|
int rc = xgetsockopt (option_, optval_, optvallen_);
|
||||||
|
if (rc == 0 || errno != EINVAL)
|
||||||
|
return rc;
|
||||||
|
|
||||||
if (option_ == ZMQ_RCVMORE) {
|
if (option_ == ZMQ_RCVMORE) {
|
||||||
if (*optvallen_ < sizeof (int)) {
|
if (*optvallen_ < sizeof (int)) {
|
||||||
@ -1037,6 +1042,11 @@ int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
|
|||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
int zmq::socket_base_t::xgetsockopt (int, const void *, size_t*)
|
||||||
|
{
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
bool zmq::socket_base_t::xhas_out ()
|
bool zmq::socket_base_t::xhas_out ()
|
||||||
{
|
{
|
||||||
|
@ -133,10 +133,13 @@ namespace zmq
|
|||||||
|
|
||||||
// The default implementation assumes there are no specific socket
|
// The default implementation assumes there are no specific socket
|
||||||
// options for the particular socket type. If not so, override this
|
// options for the particular socket type. If not so, override this
|
||||||
// method.
|
// methods.
|
||||||
virtual int xsetsockopt (int option_, const void *optval_,
|
virtual int xsetsockopt (int option_, const void *optval_,
|
||||||
size_t optvallen_);
|
size_t optvallen_);
|
||||||
|
|
||||||
|
virtual int xgetsockopt (int option_, const void *optval_,
|
||||||
|
size_t *optvallen_);
|
||||||
|
|
||||||
// The default implementation assumes that send is not supported.
|
// The default implementation assumes that send is not supported.
|
||||||
virtual bool xhas_out ();
|
virtual bool xhas_out ();
|
||||||
virtual int xsend (zmq::msg_t *msg_);
|
virtual int xsend (zmq::msg_t *msg_);
|
||||||
|
@ -70,6 +70,8 @@ namespace zmq
|
|||||||
void out_event ();
|
void out_event ();
|
||||||
void timer_event (int id_);
|
void timer_event (int id_);
|
||||||
|
|
||||||
|
// export s via i_engine so it is possible to link a pipe to fd
|
||||||
|
fd_t get_assoc_fd (){ return s; };
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// Unplug the engine from the session.
|
// Unplug the engine from the session.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user