Merge pull request #4148 from bluca/vmci

Problem: VMCI build broken
This commit is contained in:
Luca Boccassi 2021-02-23 11:57:18 +00:00 committed by GitHub
commit bd5f5a1093
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 258 additions and 312 deletions

View File

@ -65,7 +65,7 @@ matrix:
packages: packages:
- valgrind - valgrind
- libgnutls-dev - libgnutls-dev
- env: BUILD_TYPE=default CURVE=libsodium GSSAPI=enabled PGM=enabled NORM=enabled - env: BUILD_TYPE=default CURVE=libsodium GSSAPI=enabled PGM=enabled NORM=enabled VMCI=enabled
os: linux os: linux
addons: addons:
apt: apt:

View File

@ -1024,19 +1024,19 @@ endif
endif endif
if HAVE_VMCI if HAVE_VMCI
test_apps += test_pair_vmci test_reqrep_vmci test_apps += tests/test_pair_vmci tests/test_reqrep_vmci
test_pair_vmci_SOURCES = tests/test_pair_vmci.cpp tests_test_pair_vmci_SOURCES = tests/test_pair_vmci.cpp
test_pair_vmci_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_pair_vmci_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
test_pair_vmci_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_pair_vmci_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
test_pair_vmci_LDFLAGS = @LIBZMQ_VMCI_LDFLAGS@ tests_test_pair_vmci_LDFLAGS = @LIBZMQ_VMCI_LDFLAGS@
test_pair_vmci_CXXFLAGS = @LIBZMQ_VMCI_CXXFLAGS@ tests_test_pair_vmci_CXXFLAGS = @LIBZMQ_VMCI_CXXFLAGS@
test_reqrep_vmci_SOURCES = tests/test_reqrep_vmci.cpp tests_test_reqrep_vmci_SOURCES = tests/test_reqrep_vmci.cpp
test_reqrep_vmci_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_reqrep_vmci_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
test_reqrep_vmci_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_reqrep_vmci_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
test_reqrep_vmci_LDFLAGS = @LIBZMQ_VMCI_LDFLAGS@ tests_test_reqrep_vmci_LDFLAGS = @LIBZMQ_VMCI_LDFLAGS@
test_reqrep_vmci_CXXFLAGS = @LIBZMQ_VMCI_CXXFLAGS@ tests_test_reqrep_vmci_CXXFLAGS = @LIBZMQ_VMCI_CXXFLAGS@
endif endif

View File

@ -66,4 +66,14 @@ function set_config_opts() {
if [ -n "$FORCE_98" ] && [ "$FORCE_98" = "enabled" ]; then if [ -n "$FORCE_98" ] && [ "$FORCE_98" = "enabled" ]; then
CONFIG_OPTS+=("--enable-force-CXX98-compat=yes") CONFIG_OPTS+=("--enable-force-CXX98-compat=yes")
fi fi
if [ -n "$VMCI" ] && [ "$VMCI" = "enabled" ]; then
CONFIG_OPTS+=("--with-vmci=$PWD/vmci")
# VMWare headeers are not ISO C++ compliant
CONFIG_OPTS+=("--disable-pedantic")
git clone --depth 1 https://github.com/vmware/open-vm-tools.git
mkdir -p vmci
# Linux headers are redefined, so we can't just add -I to the whole dir
cp open-vm-tools/open-vm-tools/lib/include/vmci_* vmci/
fi
} }

View File

@ -28,7 +28,9 @@
*/ */
#include "precompiled.hpp" #include "precompiled.hpp"
#include "ip.hpp"
#include "vmci.hpp" #include "vmci.hpp"
#include "vmci_address.hpp"
#if defined ZMQ_HAVE_VMCI #if defined ZMQ_HAVE_VMCI
@ -97,4 +99,23 @@ void zmq::tune_vmci_connect_timeout (ctx_t *context_,
#endif #endif
} }
zmq::fd_t zmq::vmci_open_socket (const char *address_,
const zmq::options_t &options_,
zmq::vmci_address_t *out_vmci_addr_)
{
// Convert the textual address into address structure.
int rc = out_vmci_addr_->resolve (address_);
if (rc != 0)
return retired_fd;
// Create the socket.
fd_t s = open_socket (out_vmci_addr_->family (), SOCK_STREAM, 0);
if (s == retired_fd) {
return retired_fd;
}
return s;
}
#endif #endif

View File

@ -59,6 +59,10 @@ void tune_vmci_connect_timeout (ctx_t *context_,
fd_t sockfd_, fd_t sockfd_,
struct timeval timeout_); struct timeval timeout_);
#endif #endif
fd_t vmci_open_socket (const char *address_,
const options_t &options_,
vmci_address_t *out_vmci_addr_);
} }
#endif #endif

View File

@ -39,6 +39,11 @@
#include "err.hpp" #include "err.hpp"
zmq::vmci_address_t::vmci_address_t ()
{
memset (&address, 0, sizeof address);
}
zmq::vmci_address_t::vmci_address_t (ctx_t *parent_) : parent (parent_) zmq::vmci_address_t::vmci_address_t (ctx_t *parent_) : parent (parent_)
{ {
memset (&address, 0, sizeof address); memset (&address, 0, sizeof address);
@ -56,10 +61,6 @@ zmq::vmci_address_t::vmci_address_t (const sockaddr *sa,
memcpy (&address, sa, sa_len); memcpy (&address, sa, sa_len);
} }
zmq::vmci_address_t::~vmci_address_t ()
{
}
int zmq::vmci_address_t::resolve (const char *path_) int zmq::vmci_address_t::resolve (const char *path_)
{ {
// Find the ':' at end that separates address from the port number. // Find the ':' at end that separates address from the port number.
@ -125,7 +126,7 @@ int zmq::vmci_address_t::resolve (const char *path_)
return 0; return 0;
} }
int zmq::vmci_address_t::to_string (std::string &addr_) int zmq::vmci_address_t::to_string (std::string &addr_) const
{ {
if (address.svm_family != parent->get_vmci_socket_family ()) { if (address.svm_family != parent->get_vmci_socket_family ()) {
addr_.clear (); addr_.clear ();
@ -164,4 +165,13 @@ socklen_t zmq::vmci_address_t::addrlen () const
return static_cast<socklen_t> (sizeof address); return static_cast<socklen_t> (sizeof address);
} }
#if defined ZMQ_HAVE_WINDOWS
unsigned short zmq::vmci_address_t::family () const
#else
sa_family_t zmq::vmci_address_t::family () const
#endif
{
return parent->get_vmci_socket_family ();
}
#endif #endif

View File

@ -43,16 +43,21 @@ namespace zmq
class vmci_address_t class vmci_address_t
{ {
public: public:
vmci_address_t ();
vmci_address_t (ctx_t *parent_); vmci_address_t (ctx_t *parent_);
vmci_address_t (const sockaddr *sa, socklen_t sa_len, ctx_t *parent_); vmci_address_t (const sockaddr *sa, socklen_t sa_len, ctx_t *parent_);
~vmci_address_t ();
// This function sets up the address for VMCI transport. // This function sets up the address for VMCI transport.
int resolve (const char *path_); int resolve (const char *path_);
// The opposite to resolve() // The opposite to resolve()
int to_string (std::string &addr_); int to_string (std::string &addr_) const;
#if defined ZMQ_HAVE_WINDOWS
unsigned short family () const;
#else
sa_family_t family () const;
#endif
const sockaddr *addr () const; const sockaddr *addr () const;
socklen_t addrlen () const; socklen_t addrlen () const;
@ -60,8 +65,6 @@ class vmci_address_t
struct sockaddr_vm address; struct sockaddr_vm address;
ctx_t *parent; ctx_t *parent;
vmci_address_t ();
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_address_t) ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_address_t)
}; };
} }

View File

@ -35,69 +35,41 @@
#include <new> #include <new>
#include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "random.hpp" #include "random.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
#include "address.hpp" #include "address.hpp"
#include "session_base.hpp"
#include "vmci_address.hpp" #include "vmci_address.hpp"
#include "vmci.hpp" #include "vmci.hpp"
#include "session_base.hpp"
zmq::vmci_connecter_t::vmci_connecter_t (class io_thread_t *io_thread_, zmq::vmci_connecter_t::vmci_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, class session_base_t *session_,
const options_t &options_, const options_t &options_,
const address_t *addr_, address_t *addr_,
bool delayed_start_) : bool delayed_start_) :
own_t (io_thread_, options_), stream_connecter_base_t (
io_object_t (io_thread_), io_thread_, session_, options_, addr_, delayed_start_),
addr (addr_), _connect_timer_started (false)
s (retired_fd),
handle_valid (false),
delayed_start (delayed_start_),
timer_started (false),
session (session_),
current_reconnect_ivl (options.reconnect_ivl)
{ {
zmq_assert (addr); zmq_assert (_addr->protocol == protocol_name::vmci);
zmq_assert (addr->protocol == "vmci");
addr->to_string (endpoint);
socket = session->get_socket ();
} }
zmq::vmci_connecter_t::~vmci_connecter_t () zmq::vmci_connecter_t::~vmci_connecter_t ()
{ {
zmq_assert (!timer_started); zmq_assert (!_connect_timer_started);
zmq_assert (!handle_valid);
zmq_assert (s == retired_fd);
}
void zmq::vmci_connecter_t::process_plug ()
{
if (delayed_start)
add_reconnect_timer ();
else
start_connecting ();
} }
void zmq::vmci_connecter_t::process_term (int linger_) void zmq::vmci_connecter_t::process_term (int linger_)
{ {
if (timer_started) { if (_connect_timer_started) {
cancel_timer (reconnect_timer_id); cancel_timer (connect_timer_id);
timer_started = false; _connect_timer_started = false;
} }
if (handle_valid) { stream_connecter_base_t::process_term (linger_);
rm_fd (handle);
handle_valid = false;
}
if (s != retired_fd)
close ();
own_t::process_term (linger_);
} }
void zmq::vmci_connecter_t::in_event () void zmq::vmci_connecter_t::in_event ()
@ -110,9 +82,26 @@ void zmq::vmci_connecter_t::in_event ()
void zmq::vmci_connecter_t::out_event () void zmq::vmci_connecter_t::out_event ()
{ {
fd_t fd = connect (); if (_connect_timer_started) {
rm_fd (handle); cancel_timer (connect_timer_id);
handle_valid = false; _connect_timer_started = false;
}
// TODO this is still very similar to (t)ipc_connecter_t, maybe the
// differences can be factored out
rm_handle ();
const fd_t fd = connect ();
if (fd == retired_fd
&& ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED)
&& errno == ECONNREFUSED)) {
send_conn_failed (_session);
close ();
terminate ();
return;
}
// Handle the error condition by attempt to reconnect. // Handle the error condition by attempt to reconnect.
if (fd == retired_fd) { if (fd == retired_fd) {
@ -135,148 +124,154 @@ void zmq::vmci_connecter_t::out_event ()
#endif #endif
} }
// Create the engine object for this connection. create_engine (
stream_engine_t *engine = new (std::nothrow) stream_engine_t ( fd, zmq::vmci_connecter_t::get_socket_name (fd, socket_end_local));
fd, options, make_unconnected_bind_endpoint_pair (endpoint)); }
alloc_assert (engine);
// Attach the engine to the corresponding session object. std::string
send_attach (session, engine); zmq::vmci_connecter_t::get_socket_name (zmq::fd_t fd_,
socket_end_t socket_end_) const
{
struct sockaddr_storage ss;
const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss);
if (sl == 0) {
return std::string ();
}
// Shut the connecter down. const vmci_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl,
terminate (); this->get_ctx ());
std::string address_string;
socket->event_connected (make_unconnected_bind_endpoint_pair (endpoint), addr.to_string (address_string);
fd); return address_string;
} }
void zmq::vmci_connecter_t::timer_event (int id_) void zmq::vmci_connecter_t::timer_event (int id_)
{ {
zmq_assert (id_ == reconnect_timer_id); if (id_ == connect_timer_id) {
timer_started = false; _connect_timer_started = false;
start_connecting (); rm_handle ();
close ();
add_reconnect_timer ();
} else
stream_connecter_base_t::timer_event (id_);
} }
void zmq::vmci_connecter_t::start_connecting () void zmq::vmci_connecter_t::start_connecting ()
{ {
// Open the connecting socket. // Open the connecting socket.
int rc = open (); const int rc = open ();
// Connect may succeed in synchronous manner. // Connect may succeed in synchronous manner.
if (rc == 0) { if (rc == 0) {
handle = add_fd (s); _handle = add_fd (_s);
handle_valid = true;
out_event (); out_event ();
} }
// Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) {
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
// add userspace connect timeout
add_connect_timer ();
}
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
else { else {
if (s != retired_fd) if (_s != retired_fd)
close (); close ();
add_reconnect_timer (); add_reconnect_timer ();
} }
} }
void zmq::vmci_connecter_t::add_reconnect_timer () void zmq::vmci_connecter_t::add_connect_timer ()
{ {
if (options.reconnect_ivl > 0) { if (options.connect_timeout > 0) {
int rc_ivl = get_new_reconnect_ivl (); add_timer (options.connect_timeout, connect_timer_id);
add_timer (rc_ivl, reconnect_timer_id); _connect_timer_started = true;
socket->event_connect_retried (
make_unconnected_bind_endpoint_pair (endpoint), rc_ivl);
timer_started = true;
} }
} }
int zmq::vmci_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
int this_interval =
current_reconnect_ivl + (generate_random () % options.reconnect_ivl);
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl) {
// Calculate the next interval
current_reconnect_ivl = current_reconnect_ivl * 2;
if (current_reconnect_ivl >= options.reconnect_ivl_max) {
current_reconnect_ivl = options.reconnect_ivl_max;
}
}
return this_interval;
}
int zmq::vmci_connecter_t::open () int zmq::vmci_connecter_t::open ()
{ {
zmq_assert (s == retired_fd); zmq_assert (_s == retired_fd);
int family = this->get_ctx ()->get_vmci_socket_family (); // Resolve the address
if (family == -1) if (_addr->resolved.vmci_addr != NULL) {
return -1; LIBZMQ_DELETE (_addr->resolved.vmci_addr);
}
// Create the socket. _addr->resolved.vmci_addr =
s = open_socket (family, SOCK_STREAM, 0); new (std::nothrow) vmci_address_t (this->get_ctx ());
#ifdef ZMQ_HAVE_WINDOWS alloc_assert (_addr->resolved.vmci_addr);
if (s == INVALID_SOCKET) { _s = vmci_open_socket (_addr->address.c_str (), options,
errno = wsa_error_to_errno (WSAGetLastError ()); _addr->resolved.vmci_addr);
if (_s == retired_fd) {
// TODO we should emit some event in this case!
LIBZMQ_DELETE (_addr->resolved.vmci_addr);
return -1; return -1;
} }
#else zmq_assert (_addr->resolved.vmci_addr != NULL);
if (s == -1)
return -1; // Set the socket to non-blocking mode so that we get async connect().
#endif unblock_socket (_s);
const vmci_address_t *const vmci_addr = _addr->resolved.vmci_addr;
int rc;
// Connect to the remote peer. // Connect to the remote peer.
int rc = ::connect (s, addr->resolved.vmci_addr->addr (), #if defined ZMQ_HAVE_VXWORKS
addr->resolved.vmci_addr->addrlen ()); rc = ::connect (_s, (sockaddr *) vmci_addr->addr (), vmci_addr->addrlen ());
#else
rc = ::connect (_s, vmci_addr->addr (), vmci_addr->addrlen ());
#endif
// Connect was successful immediately. // Connect was successful immediately.
if (rc == 0) if (rc == 0) {
return 0; return 0;
// Forward the error.
return -1;
} }
void zmq::vmci_connecter_t::close () // Translate error codes indicating asynchronous connect has been
{ // launched to a uniform EINPROGRESS.
zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
const int rc = closesocket (s); const int last_error = WSAGetLastError ();
wsa_assert (rc != SOCKET_ERROR); if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
errno = EINPROGRESS;
else
errno = wsa_error_to_errno (last_error);
#else #else
const int rc = ::close (s); if (errno == EINTR)
errno_assert (rc == 0); errno = EINPROGRESS;
#endif #endif
socket->event_closed (make_unconnected_bind_endpoint_pair (endpoint), s); return -1;
s = retired_fd;
} }
zmq::fd_t zmq::vmci_connecter_t::connect () zmq::fd_t zmq::vmci_connecter_t::connect ()
{ {
// Following code should handle both Berkeley-derived socket // Async connect has finished. Check whether an error occurred
// implementations and Solaris.
int err = 0; int err = 0;
#if defined ZMQ_HAVE_HPUX #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof (err); int len = sizeof err;
#else #else
socklen_t len = sizeof (err); socklen_t len = sizeof err;
#endif #endif
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char *) &err, &len);
const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
// Assert if the error was caused by 0MQ bug. // Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert. // Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0); zmq_assert (rc == 0);
if (err != 0) { if (err != 0) {
if (err != WSAECONNREFUSED && err != WSAETIMEDOUT if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
&& err != WSAECONNABORTED && err != WSAEHOSTUNREACH || err == WSAENOBUFS) {
&& err != WSAENETUNREACH && err != WSAENETDOWN && err != WSAEACCES
&& err != WSAEINVAL && err != WSAEADDRINUSE
&& err != WSAECONNRESET) {
wsa_assert_no (err); wsa_assert_no (err);
} }
errno = wsa_error_to_errno (err);
return retired_fd; return retired_fd;
} }
#else #else
@ -286,16 +281,20 @@ zmq::fd_t zmq::vmci_connecter_t::connect ()
err = errno; err = errno;
if (err != 0) { if (err != 0) {
errno = err; errno = err;
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
|| errno == ETIMEDOUT || errno == EHOSTUNREACH errno_assert (errno != EBADF && errno != ENOPROTOOPT
|| errno == ENETUNREACH || errno == ENETDOWN && errno != ENOTSOCK && errno != ENOBUFS);
|| errno == EINVAL); #else
errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
&& errno != ENOBUFS);
#endif
return retired_fd; return retired_fd;
} }
#endif #endif
fd_t result = s; // Return the newly connected socket.
s = retired_fd; const fd_t result = _s;
_s = retired_fd;
return result; return result;
} }

View File

@ -38,6 +38,7 @@
#include "own.hpp" #include "own.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "stream_connecter_base.hpp"
namespace zmq namespace zmq
{ {
@ -45,8 +46,7 @@ class io_thread_t;
class session_base_t; class session_base_t;
struct address_t; struct address_t;
// TODO consider refactoring this to derive from stream_connecter_base_t class vmci_connecter_t ZMQ_FINAL : public stream_connecter_base_t
class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
{ {
public: public:
// If 'delayed_start' is true connecter first waits for a while, // If 'delayed_start' is true connecter first waits for a while,
@ -54,19 +54,21 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
vmci_connecter_t (zmq::io_thread_t *io_thread_, vmci_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_, zmq::session_base_t *session_,
const options_t &options_, const options_t &options_,
const address_t *addr_, address_t *addr_,
bool delayed_start_); bool delayed_start_);
~vmci_connecter_t (); ~vmci_connecter_t ();
protected:
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
private: private:
// ID of the timer used to delay the reconnection. // ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id.
enum enum
{ {
reconnect_timer_id = 1 connect_timer_id = 2
}; };
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug ();
void process_term (int linger_); void process_term (int linger_);
// Handlers for I/O events. // Handlers for I/O events.
@ -77,8 +79,8 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
// Internal function to start the actual connection establishment. // Internal function to start the actual connection establishment.
void start_connecting (); void start_connecting ();
// Internal function to add a reconnect timer // Internal function to add a connect timer
void add_reconnect_timer (); void add_connect_timer ();
// Internal function to return a reconnect backoff delay. // Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call // Will modify the current_reconnect_ivl used for next call
@ -90,43 +92,12 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
int open (); int open ();
// Close the connecting socket.
void close ();
// Get the file descriptor of newly created connection. Returns // Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful. // retired_fd if the connection was unsuccessful.
fd_t connect (); fd_t connect ();
// Address to connect to. Owned by session_base_t.
const address_t *addr;
// Underlying socket.
fd_t s;
// Handle corresponding to the listening socket.
handle_t handle;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool handle_valid;
// If true, connecter is waiting a while before trying to connect.
const bool delayed_start;
// True iff a timer has been started. // True iff a timer has been started.
bool timer_started; bool _connect_timer_started;
// Reference to the session we belong to.
zmq::session_base_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
// Socket
zmq::socket_base_t *socket;
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_connecter_t) ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_connecter_t)
}; };

View File

@ -35,7 +35,7 @@
#include <new> #include <new>
#include "stream_engine.hpp" //#include "stream_engine.hpp"
#include "vmci_address.hpp" #include "vmci_address.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session_base.hpp" #include "session_base.hpp"
@ -55,40 +55,18 @@
zmq::vmci_listener_t::vmci_listener_t (io_thread_t *io_thread_, zmq::vmci_listener_t::vmci_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_, socket_base_t *socket_,
const options_t &options_) : const options_t &options_) :
own_t (io_thread_, options_), stream_listener_base_t (io_thread_, socket_, options_)
io_object_t (io_thread_),
s (retired_fd),
socket (socket_)
{ {
} }
zmq::vmci_listener_t::~vmci_listener_t ()
{
zmq_assert (s == retired_fd);
}
void zmq::vmci_listener_t::process_plug ()
{
// Start polling for incoming connections.
handle = add_fd (s);
set_pollin (handle);
}
void zmq::vmci_listener_t::process_term (int linger_)
{
rm_fd (handle);
close ();
own_t::process_term (linger_);
}
void zmq::vmci_listener_t::in_event () void zmq::vmci_listener_t::in_event ()
{ {
fd_t fd = accept (); fd_t fd = accept ();
// If connection was reset by the peer in the meantime, just ignore it. // If connection was reset by the peer in the meantime, just ignore it.
if (fd == retired_fd) { if (fd == retired_fd) {
socket->event_accept_failed ( _socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (endpoint), zmq_errno ()); make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return; return;
} }
@ -107,41 +85,24 @@ void zmq::vmci_listener_t::in_event ()
} }
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t ( create_engine (fd);
fd, options, make_unconnected_bind_endpoint_pair (endpoint));
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
socket->event_accepted (make_unconnected_bind_endpoint_pair (endpoint), fd);
} }
int zmq::vmci_listener_t::get_local_address (std::string &addr_) std::string
zmq::vmci_listener_t::get_socket_name (zmq::fd_t fd_,
socket_end_t socket_end_) const
{ {
struct sockaddr_storage ss; struct sockaddr_storage ss;
#ifdef ZMQ_HAVE_HPUX const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss);
int sl = sizeof (ss); if (sl == 0) {
#else return std::string ();
socklen_t sl = sizeof (ss);
#endif
int rc = getsockname (s, (sockaddr *) &ss, &sl);
if (rc != 0) {
addr_.clear ();
return rc;
} }
vmci_address_t addr ((struct sockaddr *) &ss, sl, this->get_ctx ()); const vmci_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl,
return addr.to_string (addr_); this->get_ctx ());
std::string address_string;
addr.to_string (address_string);
return address_string;
} }
int zmq::vmci_listener_t::set_local_address (const char *addr_) int zmq::vmci_listener_t::set_local_address (const char *addr_)
@ -156,7 +117,7 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
return -1; return -1;
// Create a listening socket. // Create a listening socket.
s = _s =
open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0); open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) { if (s == INVALID_SOCKET) {
@ -165,18 +126,18 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
} }
#if !defined _WIN32_WCE #if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes. // On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0); BOOL brc = SetHandleInformation ((HANDLE) _s, HANDLE_FLAG_INHERIT, 0);
win_assert (brc); win_assert (brc);
#endif #endif
#else #else
if (s == -1) if (_s == -1)
return -1; return -1;
#endif #endif
address.to_string (endpoint); address.to_string (_endpoint);
// Bind the socket. // Bind the socket.
rc = bind (s, address.addr (), address.addrlen ()); rc = bind (_s, address.addr (), address.addrlen ());
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) { if (rc == SOCKET_ERROR) {
errno = wsa_error_to_errno (WSAGetLastError ()); errno = wsa_error_to_errno (WSAGetLastError ());
@ -188,7 +149,7 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
#endif #endif
// Listen for incoming connections. // Listen for incoming connections.
rc = listen (s, options.backlog); rc = listen (_s, options.backlog);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) { if (rc == SOCKET_ERROR) {
errno = wsa_error_to_errno (WSAGetLastError ()); errno = wsa_error_to_errno (WSAGetLastError ());
@ -199,7 +160,8 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
goto error; goto error;
#endif #endif
socket->event_listening (make_unconnected_bind_endpoint_pair (endpoint), s); _socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
_s);
return 0; return 0;
error: error:
@ -209,27 +171,13 @@ error:
return -1; return -1;
} }
void zmq::vmci_listener_t::close ()
{
zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = ::close (s);
errno_assert (rc == 0);
#endif
socket->event_closed (make_unconnected_bind_endpoint_pair (endpoint), s);
s = retired_fd;
}
zmq::fd_t zmq::vmci_listener_t::accept () zmq::fd_t zmq::vmci_listener_t::accept ()
{ {
// Accept one connection and deal with different failure modes. // Accept one connection and deal with different failure modes.
// The situation where connection cannot be accepted due to insufficient // The situation where connection cannot be accepted due to insufficient
// resources is considered valid and treated by ignoring the connection. // resources is considered valid and treated by ignoring the connection.
zmq_assert (s != retired_fd); zmq_assert (_s != retired_fd);
fd_t sock = ::accept (s, NULL, NULL); fd_t sock = ::accept (_s, NULL, NULL);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (sock == INVALID_SOCKET) { if (sock == INVALID_SOCKET) {

View File

@ -37,57 +37,37 @@
#include <string> #include <string>
#include "fd.hpp" #include "fd.hpp"
#include "own.hpp" #include "vmci_address.hpp"
#include "stdint.hpp" #include "stream_listener_base.hpp"
#include "io_object.hpp"
namespace zmq namespace zmq
{ {
class io_thread_t; class vmci_listener_t ZMQ_FINAL : public stream_listener_base_t
class socket_base_t;
// TODO consider refactoring this to derive from stream_listener_base_t
class vmci_listener_t ZMQ_FINAL : public own_t, public io_object_t
{ {
public: public:
vmci_listener_t (zmq::io_thread_t *io_thread_, vmci_listener_t (zmq::io_thread_t *io_thread_,
zmq::socket_base_t *socket_, zmq::socket_base_t *socket_,
const options_t &options_); const options_t &options_);
~vmci_listener_t ();
// Set address to listen on. // Set address to listen on.
int set_local_address (const char *addr_); int set_local_address (const char *addr_);
// Get the bound address for use with wildcards protected:
int get_local_address (std::string &addr_); std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
private: private:
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events. // Handlers for I/O events.
void in_event (); void in_event ();
// Close the listening socket.
void close ();
// Accept the new connection. Returns the file descriptor of the // Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd // newly created connection. The function may return retired_fd
// if the connection was dropped while waiting in the listen backlog. // if the connection was dropped while waiting in the listen backlog.
fd_t accept (); fd_t accept ();
// Underlying socket. int create_socket (const char *addr_);
fd_t s;
// Handle corresponding to the listening socket. // Address to listen on.
handle_t handle; vmci_address_t _address;
// Socket the listerner belongs to.
zmq::socket_base_t *socket;
// String representation of endpoint to bind to
std::string endpoint;
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_listener_t) ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_listener_t)
}; };

View File

@ -48,10 +48,10 @@ void test_pair_vmci ()
void *sc = test_context_socket (ZMQ_PAIR); void *sc = test_context_socket (ZMQ_PAIR);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ())); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ()));
bounce (sb, sc); expect_bounce_fail (sb, sc);
test_context_socket_close (sc); test_context_socket_close_zero_linger (sc);
test_context_socket_close (sb); test_context_socket_close_zero_linger (sb);
} }
int main (void) int main (void)

View File

@ -42,16 +42,16 @@ void test_reqrep_vmci ()
s << "vmci://" << VMCISock_GetLocalCID () << ":" << 5560; s << "vmci://" << VMCISock_GetLocalCID () << ":" << 5560;
std::string endpoint = s.str (); std::string endpoint = s.str ();
void *sb = test_context_socket (ZMQ_REP); void *sb = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, endpoint.c_str ())); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, endpoint.c_str ()));
void *sc = test_context_socket (ZMQ_REQ); void *sc = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ())); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ()));
bounce (sb, sc); expect_bounce_fail (sb, sc);
test_context_socket_close (sc); test_context_socket_close_zero_linger (sc);
test_context_socket_close (sb); test_context_socket_close_zero_linger (sb);
} }
int main (void) int main (void)