From f30fb8501ef845eae9a44c032813e5c45e189ac4 Mon Sep 17 00:00:00 2001 From: Stefan Radomski Date: Mon, 6 Jan 2014 10:31:42 +0100 Subject: [PATCH] Expose remote FD via zmq_msg_get(&msg, ZMQ_SRCFD) This patch allows client applications to retrieve the remote endpoint from a message that originated from a tcp socket --- include/zmq.h | 1 + src/msg.cpp | 15 +++++++++++++++ src/msg.hpp | 4 ++++ src/session_base.cpp | 6 ++++++ src/socket_base.cpp | 15 +++++++++++++++ src/socket_base.hpp | 6 ++++++ src/tcp_listener.cpp | 4 ++++ src/zmq.cpp | 2 ++ 8 files changed, 53 insertions(+) diff --git a/include/zmq.h b/include/zmq.h index d684e47f..ca90d6d5 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -296,6 +296,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); /* Message options */ #define ZMQ_MORE 1 +#define ZMQ_SRCFD 2 /* Send/recv options. */ #define ZMQ_DONTWAIT 1 diff --git a/src/msg.cpp b/src/msg.cpp index 00063d98..acdf9cf4 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -67,6 +67,7 @@ int zmq::msg_t::init_size (size_t size_) u.lmsg.content->size = size_; u.lmsg.content->ffn = NULL; u.lmsg.content->hint = NULL; + u.lmsg.content->fd = -1; new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); } return 0; @@ -99,6 +100,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, u.lmsg.content->size = size_; u.lmsg.content->ffn = ffn_; u.lmsg.content->hint = hint_; + u.lmsg.content->fd = -1; new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); } return 0; @@ -247,6 +249,19 @@ void zmq::msg_t::reset_flags (unsigned char flags_) u.base.flags &= ~flags_; } +zmq::fd_t zmq::msg_t::fd () +{ + if (u.base.type == type_lmsg) + return u.lmsg.content->fd; + return -1; +} + +void zmq::msg_t::set_fd (fd_t fd_) +{ + if (u.base.type == type_lmsg) + u.lmsg.content->fd = fd_; +} + bool zmq::msg_t::is_identity () const { return (u.base.flags & identity) == identity; diff --git a/src/msg.hpp b/src/msg.hpp index 095934ab..9d588d6c 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -25,6 +25,7 @@ #include "config.hpp" #include "atomic_counter.hpp" +#include "fd.hpp" // Signature for free function to deallocate the message content. // Note that it has to be declared as "C" so that it is the same as @@ -67,6 +68,8 @@ namespace zmq unsigned char flags (); void set_flags (unsigned char flags_); void reset_flags (unsigned char flags_); + fd_t fd (); + void set_fd (fd_t fd_); bool is_identity () const; bool is_delimiter (); bool is_vsm (); @@ -100,6 +103,7 @@ namespace zmq msg_free_fn *ffn; void *hint; zmq::atomic_counter_t refcnt; + fd_t fd; }; // Different message types. diff --git a/src/session_base.cpp b/src/session_base.cpp index f98e265a..0afd2037 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -36,6 +36,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, bool active_, class socket_base_t *socket_, const options_t &options_, const address_t *addr_) { + session_base_t *s = NULL; switch (options_.type) { case ZMQ_REQ: @@ -115,6 +116,11 @@ int zmq::session_base_t::pull_msg (msg_t *msg_) errno = EAGAIN; return -1; } + +// if (socket != NULL && socket->fd() >= 0) { +// msg_->set_fd(socket->fd()); +// } + incomplete_in = msg_->flags () & msg_t::more ? true : false; return 0; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c824d289..7a1accf8 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -136,6 +136,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : last_tsc (0), ticks (0), rcvmore (false), + file_desc(-1), monitor_socket (NULL), monitor_events (0) { @@ -826,6 +827,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) if (unlikely (rc != 0 && errno != EAGAIN)) return -1; + // set file descriptor + if (file_desc >= 0) + msg_->set_fd(file_desc); + // If we have the message, return immediately. if (rc == 0) { extract_flags (msg_); @@ -1188,6 +1193,16 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) return rc; } +void zmq::socket_base_t::set_fd(zmq::fd_t fd_) +{ + file_desc = fd_; +} + +zmq::fd_t zmq::socket_base_t::fd() +{ + return file_desc; +} + void zmq::socket_base_t::event_connected (std::string &addr_, int fd_) { if (monitor_events & ZMQ_EVENT_CONNECTED) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 9cc128b5..bf362c00 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -106,6 +106,9 @@ namespace zmq int monitor (const char *endpoint_, int events_); + void set_fd(fd_t fd_); + fd_t fd(); + void event_connected (std::string &addr_, int fd_); void event_connect_delayed (std::string &addr_, int err_); void event_connect_retried (std::string &addr_, int interval_); @@ -230,6 +233,9 @@ namespace zmq // True if the last message received had MORE flag set. bool rcvmore; + // File descriptor if applicable + fd_t file_desc; + // Improves efficiency of time measurement. clock_t clock; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index a3888abc..5f9d92b8 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -20,6 +20,7 @@ #include #include +#include #include "platform.hpp" #include "tcp_listener.hpp" @@ -90,6 +91,9 @@ void zmq::tcp_listener_t::in_event () tune_tcp_socket (fd); tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); + // remember our fd for ZMQ_SRCFD in messages + socket->set_fd(fd); + // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); diff --git a/src/zmq.cpp b/src/zmq.cpp index 7ae8bc16..25da581a 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -640,6 +640,8 @@ int zmq_msg_get (zmq_msg_t *msg_, int option_) switch (option_) { case ZMQ_MORE: return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0; + case ZMQ_SRCFD: + return ((zmq::msg_t*) msg_)->fd (); default: errno = EINVAL; return -1;