mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-13 10:52:56 +01:00
Problem: use of TCP loopback fastpath not available for user sockets
Solution: add socket option
This commit is contained in:
parent
490d76da2f
commit
a5e763039d
@ -584,6 +584,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread);
|
|||||||
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91
|
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91
|
||||||
#define ZMQ_BINDTODEVICE 92
|
#define ZMQ_BINDTODEVICE 92
|
||||||
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
|
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
|
||||||
|
#define ZMQ_LOOPBACK_FASTPATH 94
|
||||||
|
|
||||||
/* DRAFT 0MQ socket events and monitoring */
|
/* DRAFT 0MQ socket events and monitoring */
|
||||||
/* Unspecified system errors during handshake. Event value is an errno. */
|
/* Unspecified system errors during handshake. Event value is an errno. */
|
||||||
|
@ -90,7 +90,8 @@ zmq::options_t::options_t () :
|
|||||||
heartbeat_interval (0),
|
heartbeat_interval (0),
|
||||||
heartbeat_timeout (-1),
|
heartbeat_timeout (-1),
|
||||||
use_fd (-1),
|
use_fd (-1),
|
||||||
zap_enforce_domain (false)
|
zap_enforce_domain (false),
|
||||||
|
loopback_fastpath (false)
|
||||||
{
|
{
|
||||||
memset (curve_public_key, 0, CURVE_KEYSIZE);
|
memset (curve_public_key, 0, CURVE_KEYSIZE);
|
||||||
memset (curve_secret_key, 0, CURVE_KEYSIZE);
|
memset (curve_secret_key, 0, CURVE_KEYSIZE);
|
||||||
@ -627,6 +628,13 @@ int zmq::options_t::setsockopt (int option_,
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case ZMQ_LOOPBACK_FASTPATH:
|
||||||
|
if (is_int) {
|
||||||
|
loopback_fastpath = (value != 0);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
#if defined(ZMQ_ACT_MILITANT)
|
#if defined(ZMQ_ACT_MILITANT)
|
||||||
@ -1064,6 +1072,13 @@ int zmq::options_t::getsockopt (int option_,
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case ZMQ_LOOPBACK_FASTPATH:
|
||||||
|
if (is_int) {
|
||||||
|
*value = loopback_fastpath;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
#if defined(ZMQ_ACT_MILITANT)
|
#if defined(ZMQ_ACT_MILITANT)
|
||||||
malformed = false;
|
malformed = false;
|
||||||
|
@ -248,6 +248,9 @@ struct options_t
|
|||||||
|
|
||||||
// Enforce a non-empty ZAP domain requirement for PLAIN auth
|
// Enforce a non-empty ZAP domain requirement for PLAIN auth
|
||||||
bool zap_enforce_domain;
|
bool zap_enforce_domain;
|
||||||
|
|
||||||
|
// Use of loopback fastpath.
|
||||||
|
bool loopback_fastpath;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,6 +65,7 @@
|
|||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "fd.hpp"
|
#include "fd.hpp"
|
||||||
#include "ip.hpp"
|
#include "ip.hpp"
|
||||||
|
#include "tcp.hpp"
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_EVENTFD
|
#if defined ZMQ_HAVE_EVENTFD
|
||||||
#include <sys/eventfd.h>
|
#include <sys/eventfd.h>
|
||||||
@ -392,22 +393,7 @@ static void tune_socket (const SOCKET socket)
|
|||||||
(char *) &tcp_nodelay, sizeof tcp_nodelay);
|
(char *) &tcp_nodelay, sizeof tcp_nodelay);
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
int sio_loopback_fastpath = 1;
|
zmq::tcp_tune_loopback_fast_path (socket);
|
||||||
DWORD numberOfBytesReturned = 0;
|
|
||||||
|
|
||||||
rc = WSAIoctl (socket, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath,
|
|
||||||
sizeof sio_loopback_fastpath, NULL, 0,
|
|
||||||
&numberOfBytesReturned, 0, 0);
|
|
||||||
|
|
||||||
if (SOCKET_ERROR == rc) {
|
|
||||||
DWORD lastError = ::WSAGetLastError ();
|
|
||||||
|
|
||||||
if (WSAEOPNOTSUPP == lastError) {
|
|
||||||
// This system is not Windows 8 or Server 2012, and the call is not supported.
|
|
||||||
} else {
|
|
||||||
wsa_assert (false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
24
src/tcp.cpp
24
src/tcp.cpp
@ -332,3 +332,27 @@ void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void zmq::tcp_tune_loopback_fast_path (const fd_t socket_)
|
||||||
|
{
|
||||||
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
|
int sio_loopback_fastpath = 1;
|
||||||
|
DWORD numberOfBytesReturned = 0;
|
||||||
|
|
||||||
|
int rc = WSAIoctl (socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath,
|
||||||
|
sizeof sio_loopback_fastpath, NULL, 0,
|
||||||
|
&numberOfBytesReturned, 0, 0);
|
||||||
|
|
||||||
|
if (SOCKET_ERROR == rc) {
|
||||||
|
DWORD lastError = ::WSAGetLastError ();
|
||||||
|
|
||||||
|
if (WSAEOPNOTSUPP == lastError) {
|
||||||
|
// This system is not Windows 8 or Server 2012, and the call is not supported.
|
||||||
|
} else {
|
||||||
|
wsa_assert (false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
LIBZMQ_UNUSED (socket_);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
@ -66,6 +66,8 @@ int tcp_read (fd_t s_, void *data_, size_t size_);
|
|||||||
// Asserts that an internal error did not occur. Does not assert
|
// Asserts that an internal error did not occur. Does not assert
|
||||||
// on network errors such as reset or aborted connections.
|
// on network errors such as reset or aborted connections.
|
||||||
void tcp_assert_tuning_error (fd_t s_, int rc_);
|
void tcp_assert_tuning_error (fd_t s_, int rc_);
|
||||||
|
|
||||||
|
void tcp_tune_loopback_fast_path (fd_t socket_);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -302,6 +302,10 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
// Set the socket to non-blocking mode so that we get async connect().
|
// Set the socket to non-blocking mode so that we get async connect().
|
||||||
unblock_socket (s);
|
unblock_socket (s);
|
||||||
|
|
||||||
|
// Set the socket to loopback fastpath if configured.
|
||||||
|
if (options.loopback_fastpath)
|
||||||
|
tcp_tune_loopback_fast_path (s);
|
||||||
|
|
||||||
// Set the socket buffer limits for the underlying socket.
|
// Set the socket buffer limits for the underlying socket.
|
||||||
if (options.sndbuf >= 0)
|
if (options.sndbuf >= 0)
|
||||||
set_tcp_send_buffer (s, options.sndbuf);
|
set_tcp_send_buffer (s, options.sndbuf);
|
||||||
|
@ -216,6 +216,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
|
|||||||
if (options.tos != 0)
|
if (options.tos != 0)
|
||||||
set_ip_type_of_service (s, options.tos);
|
set_ip_type_of_service (s, options.tos);
|
||||||
|
|
||||||
|
// Set the socket to loopback fastpath if configured.
|
||||||
|
if (options.loopback_fastpath)
|
||||||
|
tcp_tune_loopback_fast_path (s);
|
||||||
|
|
||||||
// Bind the socket to a device if applicable
|
// Bind the socket to a device if applicable
|
||||||
if (!options.bound_device.empty ())
|
if (!options.bound_device.empty ())
|
||||||
bind_to_device (s, options.bound_device);
|
bind_to_device (s, options.bound_device);
|
||||||
|
@ -55,6 +55,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
|
|||||||
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91
|
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91
|
||||||
#define ZMQ_BINDTODEVICE 92
|
#define ZMQ_BINDTODEVICE 92
|
||||||
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
|
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
|
||||||
|
#define ZMQ_LOOPBACK_FASTPATH 94
|
||||||
|
|
||||||
/* DRAFT 0MQ socket events and monitoring */
|
/* DRAFT 0MQ socket events and monitoring */
|
||||||
/* Unspecified system errors during handshake. Event value is an errno. */
|
/* Unspecified system errors during handshake. Event value is an errno. */
|
||||||
|
@ -29,9 +29,20 @@
|
|||||||
|
|
||||||
#include "testutil.hpp"
|
#include "testutil.hpp"
|
||||||
|
|
||||||
int main (void)
|
typedef void (*extra_func_t) (void *socket);
|
||||||
|
|
||||||
|
#ifdef ZMQ_BUILD_DRAFT
|
||||||
|
void set_sockopt_fastpath (void *socket)
|
||||||
|
{
|
||||||
|
int value = 1;
|
||||||
|
int rc =
|
||||||
|
zmq_setsockopt (socket, ZMQ_LOOPBACK_FASTPATH, &value, sizeof value);
|
||||||
|
assert (rc == 0);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
void test_pair_tcp (extra_func_t extra_func = NULL)
|
||||||
{
|
{
|
||||||
setup_test_environment ();
|
|
||||||
size_t len = MAX_SOCKET_STRING;
|
size_t len = MAX_SOCKET_STRING;
|
||||||
char my_endpoint[MAX_SOCKET_STRING];
|
char my_endpoint[MAX_SOCKET_STRING];
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
@ -39,6 +50,10 @@ int main (void)
|
|||||||
|
|
||||||
void *sb = zmq_socket (ctx, ZMQ_PAIR);
|
void *sb = zmq_socket (ctx, ZMQ_PAIR);
|
||||||
assert (sb);
|
assert (sb);
|
||||||
|
|
||||||
|
if (extra_func)
|
||||||
|
extra_func (sb);
|
||||||
|
|
||||||
int rc = zmq_bind (sb, "tcp://127.0.0.1:*");
|
int rc = zmq_bind (sb, "tcp://127.0.0.1:*");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
|
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
|
||||||
@ -46,6 +61,9 @@ int main (void)
|
|||||||
|
|
||||||
void *sc = zmq_socket (ctx, ZMQ_PAIR);
|
void *sc = zmq_socket (ctx, ZMQ_PAIR);
|
||||||
assert (sc);
|
assert (sc);
|
||||||
|
if (extra_func)
|
||||||
|
extra_func (sc);
|
||||||
|
|
||||||
rc = zmq_connect (sc, my_endpoint);
|
rc = zmq_connect (sc, my_endpoint);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
@ -59,6 +77,16 @@ int main (void)
|
|||||||
|
|
||||||
rc = zmq_ctx_term (ctx);
|
rc = zmq_ctx_term (ctx);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main (void)
|
||||||
|
{
|
||||||
|
setup_test_environment ();
|
||||||
|
|
||||||
|
test_pair_tcp ();
|
||||||
|
#ifdef ZMQ_BUILD_DRAFT
|
||||||
|
test_pair_tcp (set_sockopt_fastpath);
|
||||||
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user