ZMQ_SRCFD docs and tests

Also moved the fd field from message content to message itself
This commit is contained in:
Stefan Radomski 2014-01-07 01:09:51 +01:00
parent 823b7ebeb0
commit 3aeaa6fab1
8 changed files with 129 additions and 20 deletions

View File

@ -23,6 +23,12 @@ The following properties can be retrieved with the _zmq_msg_get()_ function:
*ZMQ_MORE*::
Indicates that there are more message frames to follow after the 'message'.
*ZMQ_SRCFD*::
Returns the file descriptor of the socket the 'message' was read from. This
allows application to retrieve the remote endpoint via 'getpeername(2)'.Be
aware that the respective socket might be closed already, reused even.
Currently only implemented for TCP sockets.
RETURN VALUE
------------
The _zmq_msg_get()_ function shall return the value for the property if

View File

@ -199,7 +199,7 @@ ZMQ_EXPORT int zmq_ctx_destroy (void *context);
/* 0MQ message definition. */
/******************************************************************************/
typedef struct zmq_msg_t {unsigned char _ [32];} zmq_msg_t;
typedef struct zmq_msg_t {unsigned char _ [40];} zmq_msg_t;
typedef void (zmq_free_fn) (void *data, void *hint);

View File

@ -30,6 +30,7 @@
// Check whether the sizes of public representation of the message (zmq_msg_t)
// and private representation of the message (zmq::msg_t) match.
typedef char zmq_msg_size_check
[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];
@ -43,11 +44,13 @@ int zmq::msg_t::init ()
u.vsm.type = type_vsm;
u.vsm.flags = 0;
u.vsm.size = 0;
file_desc = -1;
return 0;
}
int zmq::msg_t::init_size (size_t size_)
{
file_desc = -1;
if (size_ <= max_vsm_size) {
u.vsm.type = type_vsm;
u.vsm.flags = 0;
@ -67,7 +70,6 @@ int zmq::msg_t::init_size (size_t size_)
u.lmsg.content->size = size_;
u.lmsg.content->ffn = NULL;
u.lmsg.content->hint = NULL;
u.lmsg.content->fd = -1;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
}
return 0;
@ -80,6 +82,8 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
// would occur once the data is accessed
assert (data_ != NULL || size_ == 0);
file_desc = -1;
// Initialize constant message if there's no need to deallocate
if(ffn_ == NULL) {
u.cmsg.type = type_cmsg;
@ -100,7 +104,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
u.lmsg.content->size = size_;
u.lmsg.content->ffn = ffn_;
u.lmsg.content->hint = hint_;
u.lmsg.content->fd = -1;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
}
return 0;
@ -249,17 +252,14 @@ void zmq::msg_t::reset_flags (unsigned char flags_)
u.base.flags &= ~flags_;
}
zmq::fd_t zmq::msg_t::fd ()
int64_t zmq::msg_t::fd ()
{
if (u.base.type == type_lmsg)
return u.lmsg.content->fd;
return -1;
return file_desc;
}
void zmq::msg_t::set_fd (fd_t fd_)
void zmq::msg_t::set_fd (int64_t fd_)
{
if (u.base.type == type_lmsg)
u.lmsg.content->fd = fd_;
file_desc = fd_;
}
bool zmq::msg_t::is_identity () const

View File

@ -25,7 +25,6 @@
#include "config.hpp"
#include "atomic_counter.hpp"
#include "fd.hpp"
// Signature for free function to deallocate the message content.
// Note that it has to be declared as "C" so that it is the same as
@ -68,8 +67,8 @@ namespace zmq
unsigned char flags ();
void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_);
fd_t fd ();
void set_fd (fd_t fd_);
int64_t fd ();
void set_fd (int64_t fd_);
bool is_identity () const;
bool is_delimiter ();
bool is_vsm ();
@ -103,7 +102,6 @@ namespace zmq
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
fd_t fd;
};
// Different message types.
@ -120,6 +118,9 @@ namespace zmq
type_cmsg = 104,
type_max = 104
};
// the file descriptor where this message originated, needs to be 64bit due to alignment
int64_t file_desc;
// Note that fields shared between different message types are not
// moved to tha parent class (msg_t). This way we ger tighter packing

View File

@ -827,12 +827,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
if (unlikely (rc != 0 && errno != EAGAIN))
return -1;
// set file descriptor
if (file_desc >= 0)
msg_->set_fd(file_desc);
// If we have the message, return immediately.
if (rc == 0) {
if (file_desc >= 0)
msg_->set_fd(file_desc);
extract_flags (msg_);
return 0;
}
@ -849,6 +847,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
rc = xrecv (msg_);
if (rc < 0)
return rc;
if (file_desc >= 0)
msg_->set_fd(file_desc);
extract_flags (msg_);
return 0;
}
@ -881,6 +881,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
}
}
if (file_desc >= 0)
msg_->set_fd(file_desc);
extract_flags (msg_);
return 0;
}

View File

@ -235,7 +235,7 @@ namespace zmq
// File descriptor if applicable
fd_t file_desc;
// Improves efficiency of time measurement.
clock_t clock;

View File

@ -17,6 +17,7 @@ noinst_PROGRAMS = test_system \
test_immediate \
test_last_endpoint \
test_term_endpoint \
test_srcfd \
test_monitor \
test_resource \
test_router_mandatory \
@ -81,6 +82,7 @@ test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_immediate_SOURCES = test_immediate.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_srcfd_SOURCES = test_srcfd.cpp
test_monitor_SOURCES = test_monitor.cpp
test_resource_SOURCES = test_resource.cpp
test_router_mandatory_SOURCES = test_router_mandatory.cpp

98
tests/test_srcfd.cpp Normal file
View File

@ -0,0 +1,98 @@
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#define MSG_SIZE 20
#ifdef _WIN32
#include <Winsock2.h>
#include <Ws2tcpip.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#endif
int main (void)
{
int rc;
setup_test_environment();
// Create the infrastructure
void *ctx = zmq_ctx_new ();
assert (ctx);
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_bind(rep, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect(req, "tcp://127.0.0.1:5560");
assert (rc == 0);
char tmp[MSG_SIZE];
zmq_send(req, tmp, MSG_SIZE, 0);
zmq_msg_t msg;
rc = zmq_msg_init(&msg);
assert (rc == 0);
zmq_recvmsg(rep, &msg, 0);
assert(zmq_msg_size(&msg) == MSG_SIZE);
// get the messages source file descriptor
int srcFd = zmq_msg_get(&msg, ZMQ_SRCFD);
assert(srcFd >= 0);
// get the remote endpoint
struct sockaddr_storage ss;
socklen_t addrlen = sizeof ss;
rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen);
assert (rc == 0);
char host [NI_MAXHOST];
rc = getnameinfo ((struct sockaddr*) &ss, addrlen, host, sizeof host, NULL, 0, NI_NUMERICHOST);
assert (rc == 0);
// assert it is localhost which connected
assert (strcmp(host, "127.0.0.1") == 0);
rc = zmq_close (rep);
assert (rc == 0);
rc = zmq_close (req);
assert (rc == 0);
// sleep a bit for the socket to be freed
usleep(30000);
// getting name from closed socket will fail
rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen);
assert (rc == -1);
assert (errno == EBADF);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}