Merge branch 'master' of github.com:zeromq/libzmq

This commit is contained in:
Mário Kašuba 2015-10-20 15:16:07 +02:00
commit c22f9f3633
130 changed files with 3191 additions and 1146 deletions

4
.gitignore vendored
View File

@ -112,6 +112,10 @@ test_client_drop_more
test_client_server
test_server_drop_more
test_thread_safe
test_thread_safe_polling
test_getsockopt_memset
test_stream_exceeds_buffer
test_poller
tests/test*.log
tests/test*.trs
src/platform.hpp*

View File

@ -2,9 +2,24 @@
language: c
os:
- linux
- osx
env:
- BUILD_TYPE=default
- BUILD_TYPE=qt-android
sudo: false
before_install:
- if [ $TRAVIS_OS_NAME == "osx" ] ; then brew update; brew install binutils ; fi
before_script:
# ZMQ stress tests need more open socket (files) than the usual default
# On OSX, it seems the way to set the max files limit is constantly changing, so
# try to use all known knobs to ensure compatibility across various versions
- if [ $TRAVIS_OS_NAME == "osx" ] ; then sudo sysctl -w kern.maxfiles=64000 ; sudo sysctl -w kern.maxfilesperproc=64000 ; sudo launchctl limit maxfiles 64000 64000 ; fi ; ulimit -n 64000
# Build and check this project according to the BUILD_TYPE
script: ./ci_build.sh

View File

@ -35,6 +35,7 @@ Christian Gudrian
Christian Kamm
Chuck Remes
Conrad D. Steenberg
Constantin Rack
Dhammika Pathirana
Dhruva Krishnamurthy
Dirk O. Kaar

View File

@ -10,9 +10,9 @@ if(APPLE)
endif()
if(WIN32)
option(WITH_TWEETNACL "Build with tweetnacl" OFF)
option(WITH_TWEETNACL "Build with tweetnacl" OFF)
else()
option(WITH_TWEETNACL "Build with tweetnacl" ON)
option(WITH_TWEETNACL "Build with tweetnacl" ON)
endif()
if(WITH_TWEETNACL)
@ -23,7 +23,7 @@ if(WITH_TWEETNACL)
)
set(TWEETNACL_SOURCES
tweetnacl/src/tweetnacl.c
tweetnacl/src/tweetnacl.c
)
if(WIN32)
else()
@ -117,10 +117,10 @@ check_include_files(windows.h ZMQ_HAVE_WINDOWS)
check_include_files(sys/uio.h ZMQ_HAVE_UIO)
check_include_files(sys/eventfd.h ZMQ_HAVE_EVENTFD)
check_library_exists(ws2_32 printf "" HAVE_WS2_32) # TODO: Why doesn't something logical like WSAStartup work?
check_library_exists(ws2 printf "" HAVE_WS2)
check_library_exists(rpcrt4 printf "" HAVE_RPCRT4) # UuidCreateSequential
check_library_exists(iphlpapi printf "" HAVE_IPHLAPI) # GetAdaptersAddresses
check_library_exists(ws2_32 fopen "" HAVE_WS2_32) # TODO: Why doesn't something logical like WSAStartup work?
check_library_exists(ws2 fopen "" HAVE_WS2)
check_library_exists(rpcrt4 fopen "" HAVE_RPCRT4) # UuidCreateSequential
check_library_exists(iphlpapi fopen "" HAVE_IPHLAPI) # GetAdaptersAddresses
check_cxx_symbol_exists(SO_PEERCRED sys/socket.h ZMQ_HAVE_SO_PEERCRED)
check_cxx_symbol_exists(LOCAL_PEERCRED sys/socket.h ZMQ_HAVE_LOCAL_PEERCRED)
@ -157,6 +157,7 @@ check_function_exists(gethrtime HAVE_GETHRTIME)
set(CMAKE_REQUIRED_INCLUDES )
add_definitions(-D_REENTRANT -D_THREAD_SAFE)
add_definitions(-DZMQ_USING_CMAKE)
option(ENABLE_EVENTFD "Enable/disable eventfd" ZMQ_HAVE_EVENTFD)
@ -326,7 +327,9 @@ endif()
#-----------------------------------------------------------------------------
# default to Release build
if(NOT CMAKE_BUILD_TYPE)
if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
# CMAKE_BUILD_TYPE is not used for multi-configuration generators like Visual Studio/XCode
# which instead use CMAKE_CONFIGURATION_TYPES
set(CMAKE_BUILD_TYPE Release CACHE STRING
"Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel."
FORCE)
@ -436,7 +439,9 @@ set(cxx-sources
xsub.cpp
zmq.cpp
zmq_utils.cpp
decoder_allocators.cpp)
decoder_allocators.cpp
socket_poller.cpp
config.hpp)
set(rc-sources version.rc)
@ -589,18 +594,18 @@ if(MSVC)
RELEASE_POSTFIX "-${CMAKE_VS_PLATFORM_TOOLSET}-mt-s-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
DEBUG_POSTFIX "-${CMAKE_VS_PLATFORM_TOOLSET}-mt-sgd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
COMPILE_FLAGS "/D ZMQ_STATIC"
OUTPUT_NAME "libzmq")
OUTPUT_NAME "libzmq-static")
else()
add_library(libzmq SHARED ${sources} ${public_headers} ${html-docs} ${readme-docs} ${zmq-pkgconfig})
set_target_properties(libzmq PROPERTIES
COMPILE_DEFINITIONS "DLL_EXPORT"
PUBLIC_HEADER "${public_headers}"
VERSION ${ZMQ_VERSION}
SOVERSION "${ZMQ_VERSION_MAJOR}.${ZMQ_VERSION_MINOR}.0")
SOVERSION "${ZMQ_VERSION_MAJOR}.${ZMQ_VERSION_MINOR}.0"
OUTPUT_NAME "libzmq")
if(ZMQ_BUILD_FRAMEWORK)
set_target_properties(libzmq PROPERTIES
FRAMEWORK TRUE
OUTPUT_NAME "ZeroMQ"
MACOSX_FRAMEWORK_IDENTIFIER "org.zeromq.libzmq"
MACOSX_FRAMEWORK_SHORT_VERSION_STRING ${ZMQ_VERSION}
MACOSX_FRAMEWORK_BUNDLE_VERSION ${ZMQ_VERSION})
@ -610,16 +615,12 @@ else()
MACOSX_PACKAGE_LOCATION etc)
set_source_files_properties(${zmq-pkgconfig} PROPERTIES
MACOSX_PACKAGE_LOCATION lib/pkgconfig)
else()
set_target_properties(libzmq PROPERTIES
OUTPUT_NAME "zmq"
)
endif()
add_library(libzmq-static STATIC ${sources} ${public_headers} ${html-docs} ${readme-docs} ${zmq-pkgconfig})
set_target_properties(libzmq-static PROPERTIES
PUBLIC_HEADER "${public_headers}"
COMPILE_DEFINITIONS "ZMQ_STATIC"
OUTPUT_NAME "zmq-static")
OUTPUT_NAME "libzmq-static")
endif()
target_link_libraries(libzmq ${SODIUM_LIBRARY} ${CMAKE_THREAD_LIBS_INIT})

View File

@ -2,7 +2,7 @@ ACLOCAL_AMFLAGS = -I config
SUBDIRS = doc
DIST_SUBDIRS = builds/msvc doc
DIST_SUBDIRS = doc builds builds/msvc
pkgconfig_DATA = src/libzmq.pc
@ -209,7 +209,9 @@ src_libzmq_la_SOURCES = \
src/zmq.cpp \
src/zmq_utils.cpp \
src/decoder_allocators.hpp \
src/decoder_allocators.cpp
src/decoder_allocators.cpp \
src/socket_poller.hpp \
src/socket_poller.cpp
if ON_MINGW
@ -348,6 +350,7 @@ test_apps = \
tests/test_proxy \
tests/test_proxy_single_socket \
tests/test_proxy_terminate \
tests/test_getsockopt_memset \
tests/test_many_sockets \
tests/test_ipc_wildcard \
tests/test_diffserv \
@ -360,12 +363,12 @@ test_apps = \
tests/test_xpub_welcome_msg \
tests/test_atomics \
tests/test_client_server \
tests/test_server_drop_more \
tests/test_client_drop_more \
tests/test_thread_safe \
tests/test_socketopt_hwm \
tests/test_heartbeats \
tests/test_stream_exceeds_buffer
tests/test_stream_exceeds_buffer \
tests/test_thread_safe_polling \
tests/test_poller
tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la
@ -519,6 +522,9 @@ tests_test_proxy_single_socket_LDADD = src/libzmq.la
tests_test_proxy_terminate_SOURCES = tests/test_proxy_terminate.cpp
tests_test_proxy_terminate_LDADD = src/libzmq.la
tests_test_getsockopt_memset_SOURCES = tests/test_getsockopt_memset.cpp
tests_test_getsockopt_memset_LDADD = src/libzmq.la
tests_test_many_sockets_SOURCES = tests/test_many_sockets.cpp
tests_test_many_sockets_LDADD = src/libzmq.la
@ -555,12 +561,6 @@ tests_test_atomics_LDADD = src/libzmq.la
tests_test_client_server_SOURCES = tests/test_client_server.cpp
tests_test_client_server_LDADD = src/libzmq.la
tests_test_server_drop_more_SOURCES = tests/test_server_drop_more.cpp
tests_test_server_drop_more_LDADD = src/libzmq.la
tests_test_client_drop_more_SOURCES = tests/test_client_drop_more.cpp
tests_test_client_drop_more_LDADD = src/libzmq.la
tests_test_thread_safe_SOURCES = tests/test_thread_safe.cpp
tests_test_thread_safe_LDADD = src/libzmq.la
@ -573,6 +573,13 @@ tests_test_heartbeats_LDADD = src/libzmq.la
tests_test_stream_exceeds_buffer_SOURCES = tests/test_stream_exceeds_buffer.cpp
tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la
tests_test_thread_safe_polling_SOURCES = tests/test_thread_safe_polling.cpp
tests_test_thread_safe_polling_LDADD = src/libzmq.la
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = src/libzmq.la
if !ON_MINGW
if !ON_CYGWIN
test_apps += \
@ -660,7 +667,7 @@ check_PROGRAMS = ${test_apps}
# Run the test cases
TESTS = $(test_apps)
XFAIL_TESTS =
XFAIL_TESTS =
if !ON_LINUX
XFAIL_TESTS += tests/test_abstract_ipc

34
builds/Makefile.am Normal file
View File

@ -0,0 +1,34 @@
# Specify all build files that have to go into source packages.
# msvc directory does its own stuff.
EXTRA_DIST = \
cygwin/Makefile.cygwin \
zos/makelibzmq \
zos/cxxall \
zos/README.md \
zos/makeclean \
zos/platform.hpp \
zos/zc++ \
zos/test_fork.cpp \
zos/maketests \
zos/runtests \
cygwin/Makefile.cygwin \
mingw32/Makefile.mingw32 \
mingw32/platform.hpp \
cmake/Modules \
cmake/Modules/FindAsciiDoc.cmake \
cmake/Modules/TestZMQVersion.cmake \
cmake/Modules/ZMQSourceRunChecks.cmake \
cmake/NSIS.template32.in \
cmake/platform.hpp.in \
cmake/Makefile.am \
cmake/Makefile \
cmake/NSIS.template64.in \
cmake/Makefile.in \
valgrind/valgrind.supp \
valgrind/vg \
nuget/readme.nuget \
nuget/libzmq.autopkg \
qt-android/android_build_helper.sh \
qt-android/ci_build.sh \
qt-android/build.sh

View File

@ -282,7 +282,16 @@ function android_build_verify_so {
fi
android_build_check_fail
local elfoutput=$(readelf -d ${sofile})
if command -v readelf >/dev/null 2>&1 ; then
local readelf_bin="readelf"
elif command -v greadelf >/dev/null 2>&1 ; then
local readelf_bin="greadelf"
else
ANDROID_BUILD_FAIL+=("Could not find [g]readelf")
fi
android_build_check_fail
local elfoutput=$($readelf_bin -d ${sofile})
local soname_regexp='soname: \[([[:alnum:]\.]+)\]'
if [[ $elfoutput =~ $soname_regexp ]]; then

View File

@ -1,12 +1,28 @@
#!/usr/bin/env bash
(cd '/tmp' \
&& wget http://dl.google.com/android/ndk/android-ndk-r9-linux-x86_64.tar.bz2 \
&& tar -xf android-ndk-r9-linux-x86_64.tar.bz2 \
&& mv android-ndk-r9 android-ndk)
NDK_VER=android-ndk-r10e
export ANDROID_NDK_ROOT="/tmp/android-ndk"
export TOOLCHAIN_PATH="/tmp/android-ndk/toolchains/arm-linux-androideabi-4.8/prebuilt/linux-x86_64/bin"
if [ $TRAVIS_OS_NAME == "linux" ]
then
NDK_PLATFORM=linux-x86_64
elif [ $TRAVIS_OS_NAME == "osx" ]
then
NDK_PLATFORM=darwin-x86_64
else
echo "Unsupported platform $TRAVIS_OS_NAME"
exit 1
fi
export FILENAME=$NDK_VER-$NDK_PLATFORM.bin
(cd '/tmp' \
&& wget http://dl.google.com/android/ndk/$FILENAME \
&& chmod a+x $FILENAME \
&& ./$FILENAME &> /dev/null ) || exit 1
unset FILENAME
export ANDROID_NDK_ROOT="/tmp/$NDK_VER"
export TOOLCHAIN_PATH="$ANDROID_NDK_ROOT/toolchains/arm-linux-androideabi-4.8/prebuilt/$NDK_PLATFORM/bin"
export TOOLCHAIN_NAME="arm-linux-androideabi-4.8"
export TOOLCHAIN_HOST="arm-linux-androideabi"
export TOOLCHAIN_ARCH="arm"

View File

@ -1,15 +1,27 @@
#!/usr/bin/env bash
set -x
if [ $BUILD_TYPE == "default" ]; then
mkdir tmp
BUILD_PREFIX=$PWD/tmp
CONFIG_OPTS=()
CONFIG_OPTS+=("CFLAGS=-I${BUILD_PREFIX}/include")
CONFIG_OPTS+=("CPPFLAGS=-I${BUILD_PREFIX}/include")
CONFIG_OPTS+=("CXXFLAGS=-I${BUILD_PREFIX}/include")
CONFIG_OPTS+=("LDFLAGS=-L${BUILD_PREFIX}/lib")
CONFIG_OPTS+=("PKG_CONFIG_PATH=${BUILD_PREFIX}/lib/pkgconfig")
CONFIG_OPTS+=("--prefix=${BUILD_PREFIX}")
# Build required projects first
# libsodium
git clone git://github.com/jedisct1/libsodium.git
( cd libsodium; ./autogen.sh; ./configure; make check; sudo make install; sudo ldconfig )
( cd libsodium; ./autogen.sh; ./configure --prefix=$BUILD_PREFIX; make check; make install)
# Build and check this project
./autogen.sh && ./configure --with-libsodium=yes && make && make check
sudo make install
(./autogen.sh && ./configure "${CONFIG_OPTS[@]}" --with-libsodium=yes && make && make check && make install) || exit 1
else
cd ./builds/${BUILD_TYPE} && ./ci_build.sh
fi

View File

@ -184,15 +184,16 @@ case "${host_os}" in
if test "x$solaris_has_atomic" = "xno"; then
AC_DEFINE(ZMQ_FORCE_MUTEXES, 1, [Force to use mutexes])
fi
# ssp library is required for libsodium on Solaris-like systems
LDFLAGS="-lssp $LDFLAGS"
CPPFLAGS="$CPPFLAGS -Wno-long-long"
;;
*freebsd*)
# Define on FreeBSD to enable all library features
CPPFLAGS="-D__BSD_VISIBLE $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_FREEBSD, 1, [Have FreeBSD OS])
;;
*dragonfly*)
CPPFLAGS="-D__BSD_VISIBLE $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_DRAGONFLY, 1, [Have DragonFly OS])
;;
*darwin*)
# Define on Darwin to enable all library features
CPPFLAGS="-D_DARWIN_C_SOURCE $CPPFLAGS"
@ -264,7 +265,7 @@ case "${host_os}" in
libzmq_dso_visibility="no"
if test "x$enable_static" = "xyes"; then
AC_MSG_ERROR([Building static libraries is not supported under MinGW32])
CPPFLAGS="-DZMQ_STATIC"
fi
# Set FD_SETSIZE to 1024
@ -416,7 +417,7 @@ fi
have_sodium_library="no"
AC_ARG_WITH([libsodium], [AS_HELP_STRING([--with-libsodium],
[require libzmq build with libsodium. Requires pkg-config [default=no]])],
[require libzmq build with libsodium crypto library. Requires pkg-config [default=yes]])],
[require_libsodium_ext=$withval],
[require_libsodium_ext=yes])
@ -427,8 +428,17 @@ fi
if test "x$have_sodium_library" != "xno"; then
AC_DEFINE(HAVE_LIBSODIUM, 1, [The libsodium library is to be used.])
# ssp library is required for libsodium on Solaris-like systems
case "${host_os}" in
*solaris*)
LDFLAGS="-lssp $LDFLAGS"
CPPFLAGS="$CPPFLAGS -Wno-long-long"
;;
esac
fi
AM_CONDITIONAL(HAVE_SODIUM, test "x$have_sodium_library" != "xno")
# build using pgm
@ -569,6 +579,7 @@ AC_CONFIG_FILES([ \
Makefile \
src/libzmq.pc \
doc/Makefile \
builds/Makefile \
builds/msvc/Makefile \
builds/redhat/zeromq.spec])

View File

@ -6,6 +6,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \
zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \
zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \
zmq_msg_send.3 zmq_msg_recv.3 \
zmq_msg_routing_id.3 zmq_msg_set_routing_id.3 \
zmq_send.3 zmq_recv.3 zmq_send_const.3 \
zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \
zmq_getsockopt.3 zmq_setsockopt.3 \

View File

@ -33,7 +33,7 @@ are given a linger timeout of zero. You must still close all sockets before
calling zmq_term.
[horizontal]
Default value:: false (old behavior)
Default value:: true (old behavior)
ZMQ_IO_THREADS: Set number of I/O threads

View File

@ -63,6 +63,20 @@ Default value:: 100
Applicable socket types:: all, only for connection-oriented transports
ZMQ_CONNECT_TIMEOUT: Retrieve connect() timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieves how long to wait before timing-out a connect() system call.
The connect() system call normally takes a long time before it returns a
time out error. Setting this option allows the library to time out the call
at an earlier interval.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 0 (disabled)
Applicable socket types:: all, when using TCP transports.
ZMQ_CURVE_PUBLICKEY: Retrieve current CURVE public key
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -633,9 +647,9 @@ Default value:: -1 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_TCP_KEEPALIVE_IDLE: Override TCP_KEEPCNT(or TCP_KEEPALIVE on some OS)
ZMQ_TCP_KEEPALIVE_IDLE: Override TCP_KEEPIDLE (or TCP_KEEPALIVE on some OS)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Override 'TCP_KEEPCNT'(or 'TCP_KEEPALIVE' on some OS) socket option (where
Override 'TCP_KEEPIDLE'(or 'TCP_KEEPALIVE' on some OS) socket option (where
supported by OS). The default value of `-1` means to skip any overrides and
leave it to OS default.
@ -658,6 +672,33 @@ Default value:: -1 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_TCP_RETRANSMIT_TIMEOUT: Retrieve TCP Retransmit Timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
On OSes where it is supported, retrieves how long before an unacknowledged TCP
retransmit times out.
The system normally attempts many TCP retransmits following an exponential
backoff strategy. This means that after a network outage, it may take a long
time before the session can be re-established. Setting this option allows
the timeout to happen at a shorter interval.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 0 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_THREADSAFE: Retrieve socket thread safety
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREADSAFE' option shall retrieve a boolean value indicating whether
or not the socket is threadsafe. Currently 'ZMQ_CLIENT' and 'ZMQ_SERVER' sockets
are threadsafe.
[horizontal]
Option value type:: boolean
Applicable socket types:: all
ZMQ_TOS: Retrieve the Type-of-Service socket override status
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the IP_TOS option for the socket.

View File

@ -41,9 +41,11 @@ Connecting a socket
~~~~~~~~~~~~~~~~~~~
When connecting a 'socket' to a peer address using _zmq_connect()_ with the
'inproc' transport, the 'endpoint' shall be interpreted as an arbitrary string
identifying the 'name' to connect to. The 'name' must have been previously
created by assigning it to at least one 'socket' within the same 0MQ 'context'
as the 'socket' being connected.
identifying the 'name' to connect to. Before version 4.0 he 'name' must have
been previously created by assigning it to at least one 'socket' within the
same 0MQ 'context' as the 'socket' being connected. Since version 4.0 the
order of _zmq_bind()_ and _zmq_connect()_ does not matter just like for the tcp
transport type.
EXAMPLES

View File

@ -0,0 +1,61 @@
zmq_msg_routing_id(3)
=====================
NAME
----
zmq_msg_routing_id - return routing ID for message, if any
SYNOPSIS
--------
*uint32_t zmq_msg_routing_id (zmq_msg_t '*message');*
DESCRIPTION
-----------
The _zmq_msg_routing_id()_ function returns the routing ID for the message,
if any. The routing ID is set on all messages received from a 'ZMQ_SERVER'
socket. To send a message to a 'ZMQ_SERVER' socket you must set the routing
ID of a connected 'ZMQ_CLIENT' peer. Routing IDs are transient.
RETURN VALUE
------------
The _zmq_msg_routing_id()_ function shall return zero if there is no routing
ID, otherwise it shall return an unsigned 32-bit integer greater than zero.
EXAMPLE
-------
.Receiving a client message and routing ID
----
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server = zmq_socket (ctx, ZMQ_SERVER);
assert (server);
int rc = zmq_bind (server, "tcp://127.0.0.1:8080");
assert (rc == 0);
zmq_msg_t message;
rc = zmq_msg_init (&message);
assert (rc == 0);
// Receive a message from socket
rc = zmq_msg_recv (server, &message, 0);
assert (rc != -1);
uint32_t routing_id = zmq_msg_routing_id (&message);
assert (routing_id);
----
SEE ALSO
--------
linkzmq:zmq_msg_set_routing_id[3]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.

View File

@ -35,7 +35,7 @@ below for a detailed description.
The _zmq_msg_t_ structure passed to _zmq_msg_send()_ is nullified during the
call. If you want to send the same message to multiple sockets you have to copy
it using (e.g. using _zmq_msg_copy()_).
it (e.g. using _zmq_msg_copy()_).
NOTE: A successful invocation of _zmq_msg_send()_ does not indicate that the
message has been transmitted to the network, only that it has been queued on

View File

@ -0,0 +1,46 @@
zmq_msg_set_routing_id(3)
=========================
NAME
----
zmq_msg_set_routing_id - set routing ID property on message
SYNOPSIS
--------
*int zmq_msg_set_routing_id (zmq_msg_t '*message', uint32_t 'routing_id');*
DESCRIPTION
-----------
The _zmq_msg_set_routing_id()_ function sets the 'routing_id' specified, on the
the message pointed to by the 'message' argument. The 'routing_id' must be
greater than zero. To get a valid routing ID, you must receive a message
from a 'ZMQ_SERVER' socket, and use the libzmq:zmq_msg_routing_id method.
Routing IDs are transient.
RETURN VALUE
------------
The _zmq_msg_set_routing_id()_ function shall return zero if successful. Otherwise it
shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*EINVAL*::
The provided 'routing_id' is zero.
SEE ALSO
--------
linkzmq:zmq_msg_routing_id[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.

View File

@ -31,7 +31,7 @@ below for a detailed description.
The _zmq_msg_t_ structure passed to _zmq_sendmsg()_ is nullified during the
call. If you want to send the same message to multiple sockets you have to copy
it using (e.g. using _zmq_msg_copy()_).
it (e.g. using _zmq_msg_copy()_).
NOTE: A successful invocation of _zmq_sendmsg()_ does not indicate that the
message has been transmitted to the network, only that it has been queued on

View File

@ -109,6 +109,20 @@ Default value:: 0 (false)
Applicable socket types:: ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER
ZMQ_CONNECT_TIMEOUT: Set connect() timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets how long to wait before timing-out a connect() system call.
The connect() system call normally takes a long time before it returns a
time out error. Setting this option allows the library to time out the call
at an earlier interval.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 0 (disabled)
Applicable socket types:: all, when using TCP transports.
ZMQ_CURVE_PUBLICKEY: Set CURVE public key
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the socket's long term public key. You must set this on CURVE client
@ -819,6 +833,22 @@ Default value:: -1 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_TCP_RETRANSMIT_TIMEOUT: Set TCP Retransmit Timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
On OSes where it is supported, sets how long before an unacknowledged TCP
retransmit times out.
The system normally attempts many TCP retransmits following an exponential
backoff strategy. This means that after a network outage, it may take a long
time before the session can be re-established. Setting this option allows
the timeout to happen at a shorter interval.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 0 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_TOS: Set the Type-of-Service on socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the ToS fields (Differentiated services (DS) and Explicit Congestion

View File

@ -48,7 +48,7 @@ _zmq_bind()_, thus allowing many-to-many relationships.
.Thread safety
0MQ 'sockets' are _not_ thread safe. Applications MUST NOT use a socket
from multiple threads except after migrating a socket from one thread to
from multiple threads except after migrating a socket from one thread to
another with a "full fence" memory barrier.
.Socket types
@ -56,73 +56,51 @@ The following sections present the socket types defined by 0MQ, grouped by the
general _messaging pattern_ which is built from related socket types.
Request-reply pattern
Client-server pattern
~~~~~~~~~~~~~~~~~~~~~
The request-reply pattern is used for sending requests from a ZMQ_REQ _client_
to one or more ZMQ_REP _services_, and receiving subsequent replies to each
request sent.
The request-reply pattern is formally defined by http://rfc.zeromq.org/spec:28.
The client-server pattern is used to allow a single 'ZMQ_SERVER' _server_ talk
to one or more 'ZMQ_CLIENT' _clients_. The client always starts the conversation,
after which either peer can send messages asynchronously, to the other.
ZMQ_REQ
^^^^^^^
A socket of type 'ZMQ_REQ' is used by a _client_ to send requests to and
receive replies from a _service_. This socket type allows only an alternating
sequence of _zmq_send(request)_ and subsequent _zmq_recv(reply)_ calls. Each
request sent is round-robined among all _services_, and each reply received is
matched with the last issued request.
The client-server pattern is formally defined by http://rfc.zeromq.org/spec:41.
If no services are available, then any send operation on the socket shall
block until at least one _service_ becomes available. The REQ socket shall
not discard messages.
Note: this pattern is meant to eventually deprecate the use of 'ZMQ_DEALER' and
'ZMQ_ROUTER' to build client-server architectures, as well as 'ZMQ_REP' and
'ZMQ_REQ' for request-reply.
[horizontal]
.Summary of ZMQ_REQ characteristics
Compatible peer sockets:: 'ZMQ_REP', 'ZMQ_ROUTER'
Direction:: Bidirectional
Send/receive pattern:: Send, Receive, Send, Receive, ...
Outgoing routing strategy:: Round-robin
Incoming routing strategy:: Last peer
Action in mute state:: Block
ZMQ_REP
^^^^^^^
A socket of type 'ZMQ_REP' is used by a _service_ to receive requests from and
send replies to a _client_. This socket type allows only an alternating
sequence of _zmq_recv(request)_ and subsequent _zmq_send(reply)_ calls. Each
request received is fair-queued from among all _clients_, and each reply sent
is routed to the _client_ that issued the last request. If the original
requester does not exist any more the reply is silently discarded.
[horizontal]
.Summary of ZMQ_REP characteristics
Compatible peer sockets:: 'ZMQ_REQ', 'ZMQ_DEALER'
Direction:: Bidirectional
Send/receive pattern:: Receive, Send, Receive, Send, ...
Incoming routing strategy:: Fair-queued
Outgoing routing strategy:: Last peer
ZMQ_DEALER
ZMQ_CLIENT
^^^^^^^^^^
A socket of type 'ZMQ_DEALER' is an advanced pattern used for extending
request/reply sockets. Each message sent is round-robined among all connected
peers, and each message received is fair-queued from all connected peers.
A 'ZMQ_CLIENT' socket talks to a 'ZMQ_SERVER' socket. Either peer can connect,
though the usual and recommended model is to bind the 'ZMQ_SERVER' and connect
the 'ZMQ_CLIENT'.
When a 'ZMQ_DEALER' socket enters the 'mute' state due to having reached the
high water mark for all peers, or if there are no peers at all, then any
linkzmq:zmq_send[3] operations on the socket shall block until the mute
state ends or at least one peer becomes available for sending; messages are not
discarded.
If the 'ZMQ_CLIENT' socket has established a connection, linkzmq:zmq_send[3]
will accept messages, queue them, and send them as rapidly as the network
allows. The outgoing buffer limit is defined by the high water mark for the
socket. If the outgoing buffer is full, or if there is no connected peer,
linkzmq:zmq_send[3] will block, by default. The 'ZMQ_CLIENT' socket will not
drop messages.
When a 'ZMQ_DEALER' socket is connected to a 'ZMQ_REP' socket each message sent
must consist of an empty message part, the _delimiter_, followed by one or more
_body parts_.
When a 'ZMQ_CLIENT' socket is connected to multiple 'ZMQ_SERVER' sockets,
outgoing messages are distributed between connected peers on a round-robin
basis. Likewise, the 'ZMQ_CLIENT' socket receives messages fairly from each
connected peer. This usage is sensible only for stateless protocols.
'ZMQ_CLIENT' sockets are threadsafe and can be used from multiple threads
at the same time. Note that replies from a 'ZMQ_SERVER' socket will go to
the first client thread that calls libzmq:zmq_msg_recv. If you need to get
replies back to the originating thread, use one 'ZMQ_CLIENT' socket per
thread.
NOTE: 'ZMQ_CLIENT' sockets are threadsafe. They do not accept the ZMQ_SNDMORE
option on sends not ZMQ_RCVMORE on receives. This limits them to single part
data. The intention is to extend the API to allow scatter/gather of multi-part
data.
[horizontal]
.Summary of ZMQ_DEALER characteristics
Compatible peer sockets:: 'ZMQ_ROUTER', 'ZMQ_REP', 'ZMQ_DEALER'
.Summary of ZMQ_CLIENT characteristics
Compatible peer sockets:: 'ZMQ_SERVER'
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Outgoing routing strategy:: Round-robin
@ -130,38 +108,35 @@ Incoming routing strategy:: Fair-queued
Action in mute state:: Block
ZMQ_ROUTER
ZMQ_SERVER
^^^^^^^^^^
A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending
request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall
prepend a message part containing the _identity_ of the originating peer to the
message before passing it to the application. Messages received are fair-queued
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
remove the first part of the message and use it to determine the _identity_ of
the peer the message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded by default, unless 'ZMQ_ROUTER_MANDATORY'
socket option is set to '1'.
A 'ZMQ_SERVER' socket talks to a set of 'ZMQ_CLIENT' sockets. A 'ZMQ_SERVER'
socket can only reply to an incoming message: the 'ZMQ_CLIENT' peer must
always initiate a conversation.
When a 'ZMQ_ROUTER' socket enters the 'mute' state due to having reached the
high water mark for all peers, then any messages sent to the socket shall be dropped
until the mute state ends. Likewise, any messages routed to a peer for which
the individual high water mark has been reached shall also be dropped.
Each received message has a 'routing_id' that is a 32-bit unsigned integer.
The application can fetch this with linkzmq:zmq_msg_routing_id[3]. To send
a message to a given 'ZMQ_CLIENT' peer the application must set the peer's
'routing_id' on the message, using linkzmq:zmq_msg_set_routing_id[3].
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
_identity_ of the originating peer each message received shall contain an empty
_delimiter_ message part. Hence, the entire structure of each received message
as seen by the application becomes: one or more _identity_ parts, _delimiter_
part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the
application must include the _delimiter_ part.
If the 'routing_id' is not specified, or does not refer to a connected client
peer, the send call will fail with EHOSTUNREACH. If the outgoing buffer for
the client peer is full, the send call will fail with EAGAIN. The 'ZMQ_SERVER'
socket shall not drop messages, nor shall it block.
NOTE: 'ZMQ_SERVER' sockets are threadsafe. They do not accept the ZMQ_SNDMORE
option on sends not ZMQ_RCVMORE on receives. This limits them to single part
data. The intention is to extend the API to allow scatter/gather of multi-part
data.
[horizontal]
.Summary of ZMQ_ROUTER characteristics
Compatible peer sockets:: 'ZMQ_DEALER', 'ZMQ_REQ', 'ZMQ_ROUTER'
.Summary of ZMQ_SERVER characteristics
Compatible peer sockets:: 'ZMQ_CLIENT'
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Outgoing routing strategy:: See text
Incoming routing strategy:: Fair-queued
Action in mute state:: Drop
Action in mute state:: Return EAGAIN
Publish-subscribe pattern
@ -328,26 +303,26 @@ Action in mute state:: Block
Native Pattern
~~~~~~~~~~~~~~
The native pattern is used for communicating with TCP peers and allows
The native pattern is used for communicating with TCP peers and allows
asynchronous requests and replies in either direction.
ZMQ_STREAM
^^^^^^^^^^
A socket of type 'ZMQ_STREAM' is used to send and receive TCP data from a
non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can
A socket of type 'ZMQ_STREAM' is used to send and receive TCP data from a
non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can
act as client and/or server, sending and/or receiving TCP data asynchronously.
When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part
containing the _identity_ of the originating peer to the message before passing
it to the application. Messages received are fair-queued from among all
connected peers.
containing the _identity_ of the originating peer to the message before passing
it to the application. Messages received are fair-queued from among all
connected peers.
When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the
message and use it to determine the _identity_ of the peer the message shall be
When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the
message and use it to determine the _identity_ of the peer the message shall be
routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.
To open a connection to a server, use the zmq_connect call, and then fetch the
To open a connection to a server, use the zmq_connect call, and then fetch the
socket identity using the ZMQ_IDENTITY zmq_getsockopt call.
To close a specific connection, send the identity frame followed by a
@ -373,6 +348,116 @@ Incoming routing strategy:: Fair-queued
Action in mute state:: EAGAIN
Request-reply pattern
~~~~~~~~~~~~~~~~~~~~~
The request-reply pattern is used for sending requests from a ZMQ_REQ _client_
to one or more ZMQ_REP _services_, and receiving subsequent replies to each
request sent.
The request-reply pattern is formally defined by http://rfc.zeromq.org/spec:28.
Note: this pattern will be deprecated in favor of the client-server pattern.
ZMQ_REQ
^^^^^^^
A socket of type 'ZMQ_REQ' is used by a _client_ to send requests to and
receive replies from a _service_. This socket type allows only an alternating
sequence of _zmq_send(request)_ and subsequent _zmq_recv(reply)_ calls. Each
request sent is round-robined among all _services_, and each reply received is
matched with the last issued request.
If no services are available, then any send operation on the socket shall
block until at least one _service_ becomes available. The REQ socket shall
not discard messages.
[horizontal]
.Summary of ZMQ_REQ characteristics
Compatible peer sockets:: 'ZMQ_REP', 'ZMQ_ROUTER'
Direction:: Bidirectional
Send/receive pattern:: Send, Receive, Send, Receive, ...
Outgoing routing strategy:: Round-robin
Incoming routing strategy:: Last peer
Action in mute state:: Block
ZMQ_REP
^^^^^^^
A socket of type 'ZMQ_REP' is used by a _service_ to receive requests from and
send replies to a _client_. This socket type allows only an alternating
sequence of _zmq_recv(request)_ and subsequent _zmq_send(reply)_ calls. Each
request received is fair-queued from among all _clients_, and each reply sent
is routed to the _client_ that issued the last request. If the original
requester does not exist any more the reply is silently discarded.
[horizontal]
.Summary of ZMQ_REP characteristics
Compatible peer sockets:: 'ZMQ_REQ', 'ZMQ_DEALER'
Direction:: Bidirectional
Send/receive pattern:: Receive, Send, Receive, Send, ...
Incoming routing strategy:: Fair-queued
Outgoing routing strategy:: Last peer
ZMQ_DEALER
^^^^^^^^^^
A socket of type 'ZMQ_DEALER' is an advanced pattern used for extending
request/reply sockets. Each message sent is round-robined among all connected
peers, and each message received is fair-queued from all connected peers.
When a 'ZMQ_DEALER' socket enters the 'mute' state due to having reached the
high water mark for all peers, or if there are no peers at all, then any
linkzmq:zmq_send[3] operations on the socket shall block until the mute
state ends or at least one peer becomes available for sending; messages are not
discarded.
When a 'ZMQ_DEALER' socket is connected to a 'ZMQ_REP' socket each message sent
must consist of an empty message part, the _delimiter_, followed by one or more
_body parts_.
[horizontal]
.Summary of ZMQ_DEALER characteristics
Compatible peer sockets:: 'ZMQ_ROUTER', 'ZMQ_REP', 'ZMQ_DEALER'
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Outgoing routing strategy:: Round-robin
Incoming routing strategy:: Fair-queued
Action in mute state:: Block
ZMQ_ROUTER
^^^^^^^^^^
A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending
request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall
prepend a message part containing the _identity_ of the originating peer to the
message before passing it to the application. Messages received are fair-queued
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
remove the first part of the message and use it to determine the _identity_ of
the peer the message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded by default, unless 'ZMQ_ROUTER_MANDATORY'
socket option is set to '1'.
When a 'ZMQ_ROUTER' socket enters the 'mute' state due to having reached the
high water mark for all peers, then any messages sent to the socket shall be dropped
until the mute state ends. Likewise, any messages routed to a peer for which
the individual high water mark has been reached shall also be dropped.
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
_identity_ of the originating peer each message received shall contain an empty
_delimiter_ message part. Hence, the entire structure of each received message
as seen by the application becomes: one or more _identity_ parts, _delimiter_
part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the
application must include the _delimiter_ part.
[horizontal]
.Summary of ZMQ_ROUTER characteristics
Compatible peer sockets:: 'ZMQ_DEALER', 'ZMQ_REQ', 'ZMQ_ROUTER'
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Outgoing routing strategy:: See text
Incoming routing strategy:: Fair-queued
Action in mute state:: Drop
RETURN VALUE
------------
The _zmq_socket()_ function shall return an opaque handle to the newly created

View File

@ -150,7 +150,7 @@ get_monitor_event (void *monitor, int *value, char **address)
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
*address [size] = 0;
(*address)[size] = 0;
}
return event;
}

View File

@ -227,8 +227,8 @@ ZMQ_EXPORT int zmq_msg_more (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
ZMQ_EXPORT int zmq_msg_set_routing_id(zmq_msg_t *msg, uint32_t routing_id);
ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id);
ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
/******************************************************************************/
@ -320,6 +320,9 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);
#define ZMQ_HEARTBEAT_TTL 76
#define ZMQ_HEARTBEAT_TIMEOUT 77
#define ZMQ_XPUB_VERBOSE_UNSUBSCRIBE 78
#define ZMQ_CONNECT_TIMEOUT 79
#define ZMQ_TCP_RETRANSMIT_TIMEOUT 80
#define ZMQ_THREAD_SAFE 81
/* Message options */
#define ZMQ_MORE 1
@ -380,7 +383,8 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events);
ZMQ_EXPORT int zmq_add_pollfd (void *s, void *p);
ZMQ_EXPORT int zmq_remove_pollfd (void *s, void *p);
/******************************************************************************/
/* I/O multiplexing. */
@ -405,7 +409,52 @@ typedef struct zmq_pollitem_t
#define ZMQ_POLLITEMS_DFLT 16
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
/* Pollfd polling on thread safe socket */
/******************************************************************************/
ZMQ_EXPORT void *zmq_pollfd_new ();
ZMQ_EXPORT int zmq_pollfd_close (void *p);
ZMQ_EXPORT void zmq_pollfd_recv (void *p);
ZMQ_EXPORT int zmq_pollfd_wait (void *p, int timeout_);
ZMQ_EXPORT int zmq_pollfd_poll (void *p, zmq_pollitem_t *items, int nitems, long timeout);
#if defined _WIN32
ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p);
#else
ZMQ_EXPORT int zmq_pollfd_fd (void *p);
#endif
/******************************************************************************/
/* Poller polling on sockets,fd and threaf safe sockets */
/******************************************************************************/
typedef struct zmq_poller_event_t
{
void *socket;
#if defined _WIN32
SOCKET fd;
#else
int fd;
#endif
void *user_data;
} zmq_poller_event_t;
ZMQ_EXPORT void *zmq_poller_new ();
ZMQ_EXPORT int zmq_poller_close (void *poller);
ZMQ_EXPORT int zmq_poller_add_socket (void *poller, void *socket, void *user_data);
ZMQ_EXPORT int zmq_poller_remove_socket (void *poller, void *socket);
ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
#if defined _WIN32
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data);
ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, SOCKET fd);
#else
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, int fd, void *user_data);
ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
#endif
/******************************************************************************/
/* Message proxying */

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "platform.hpp"
#include "address.hpp"
#include "err.hpp"
@ -49,16 +50,14 @@ zmq::address_t::~address_t ()
{
if (protocol == "tcp") {
if (resolved.tcp_addr) {
delete resolved.tcp_addr;
resolved.tcp_addr = 0;
LIBZMQ_DELETE(resolved.tcp_addr);
}
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
else
if (protocol == "ipc") {
if (resolved.ipc_addr) {
delete resolved.ipc_addr;
resolved.ipc_addr = 0;
LIBZMQ_DELETE(resolved.ipc_addr);
}
}
#endif
@ -66,8 +65,7 @@ zmq::address_t::~address_t ()
else
if (protocol == "tipc") {
if (resolved.tipc_addr) {
delete resolved.tipc_addr;
resolved.tipc_addr = 0;
LIBZMQ_DELETE(resolved.tipc_addr);
}
}
#endif

View File

@ -45,10 +45,10 @@ namespace std
{
typedef unsigned char char_type;
// Unsigned as wint_t in unsigned.
typedef unsigned long int_type;
typedef streampos pos_type;
typedef streamoff off_type;
typedef mbstate_t state_type;
typedef unsigned long int_type;
typedef streampos pos_type;
typedef streamoff off_type;
typedef mbstate_t state_type;
static void
assign(char_type& __c1, const char_type& __c2)

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "client.hpp"
#include "err.hpp"
#include "msg.hpp"
@ -43,10 +44,9 @@ zmq::client_t::~client_t ()
void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void) subscribe_to_all_;
LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_);
zmq_assert (pipe_);
fq.attach (pipe_);
lb.attach (pipe_);
@ -54,28 +54,26 @@ void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
int zmq::client_t::xsend (msg_t *msg_)
{
zmq_assert(!(msg_->flags () & msg_t::more));
return lb.sendpipe (msg_, NULL);
}
int zmq::client_t::xrecv (msg_t *msg_)
{
{
int rc = fq.recvpipe (msg_, NULL);
// Drop any messages with more flag
while (rc == 0 && msg_->flags () & msg_t::more) {
// drop all frames of the current multi-frame message
rc = fq.recvpipe (msg_, NULL);
rc = fq.recvpipe (msg_, NULL);
while (rc == 0 && msg_->flags () & msg_t::more)
rc = fq.recvpipe (msg_, NULL);
rc = fq.recvpipe (msg_, NULL);
// get the new message
if (rc == 0)
if (rc == 0)
rc = fq.recvpipe (msg_, NULL);
}
}
return rc;
}

View File

@ -149,16 +149,16 @@ uint64_t zmq::clock_t::now_us ()
// Use POSIX clock_gettime function to get precise monotonic time.
struct timespec tv;
int rc = clock_gettime (CLOCK_MONOTONIC, &tv);
// Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported.
// This should be a configuration check, but I looked into it and writing an
// AC_FUNC_CLOCK_MONOTONIC seems beyond my powers.
if( rc != 0) {
// Use POSIX gettimeofday function to get precise time.
struct timeval tv;
int rc = gettimeofday (&tv, NULL);
errno_assert (rc == 0);
return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
}
// Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported.
// This should be a configuration check, but I looked into it and writing an
// AC_FUNC_CLOCK_MONOTONIC seems beyond my powers.
if( rc != 0) {
// Use POSIX gettimeofday function to get precise time.
struct timeval tv;
int rc = gettimeofday (&tv, NULL);
errno_assert (rc == 0);
return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
}
return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_nsec / 1000);
#elif defined HAVE_GETHRTIME

View File

@ -47,36 +47,36 @@
namespace zmq
{
class condition_variable_t
{
public:
inline condition_variable_t ()
{
zmq_assert(false);
}
class condition_variable_t
{
public:
inline condition_variable_t ()
{
zmq_assert(false);
}
inline ~condition_variable_t ()
{
inline ~condition_variable_t ()
{
}
}
inline int wait (mutex_t* mutex_, int timeout_ )
{
zmq_assert(false);
return -1;
}
inline int wait (mutex_t* mutex_, int timeout_ )
{
zmq_assert(false);
return -1;
}
inline void broadcast ()
{
zmq_assert(false);
}
inline void broadcast ()
{
zmq_assert(false);
}
private:
private:
// Disable copy construction and assignment.
condition_variable_t (const condition_variable_t&);
void operator = (const condition_variable_t&);
};
// Disable copy construction and assignment.
condition_variable_t (const condition_variable_t&);
void operator = (const condition_variable_t&);
};
}
@ -95,7 +95,7 @@ namespace zmq
inline ~condition_variable_t ()
{
}
inline int wait (mutex_t* mutex_, int timeout_ )
@ -110,7 +110,7 @@ namespace zmq
if (rc != ERROR_TIMEOUT)
win_assert(rc);
errno = EAGAIN;
errno = EAGAIN;
return -1;
}
@ -161,9 +161,9 @@ namespace zmq
if (timeout_ != -1) {
struct timespec timeout;
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += timeout_ / 1000;
timeout.tv_nsec += (timeout_ % 1000) * 1000000;
timeout.tv_nsec += (timeout_ % 1000) * 1000000;
rc = pthread_cond_timedwait (&cond, mutex_->get_mutex (), &timeout);
}
else

View File

@ -89,7 +89,11 @@ namespace zmq
// On some OSes the signaler has to be emulated using a TCP
// connection. In such cases following port is used.
signaler_port = 5905
// If 0, it lets the OS choose a free port without requiring use of a
// global mutex. The original implementation of a Windows signaler
// socket used port 5905 instead of letting the OS choose a free port.
// https://github.com/zeromq/libzmq/issues/1542
signaler_port = 0
};
}

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "platform.hpp"
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
@ -97,15 +98,17 @@ zmq::ctx_t::~ctx_t ()
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
io_threads [i]->stop ();
}
// Wait till I/O threads actually terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
LIBZMQ_DELETE(io_threads [i]);
}
// Deallocate the reaper thread object.
delete reaper;
LIBZMQ_DELETE(reaper);
// Deallocate the array of mailboxes. No special work is
// needed as mailboxes themselves were deallocated with their
@ -124,15 +127,20 @@ zmq::ctx_t::~ctx_t ()
int zmq::ctx_t::terminate ()
{
// Connect up any pending inproc connections, otherwise we will hang
slot_sync.lock();
bool saveTerminating = terminating;
terminating = false;
// Connect up any pending inproc connections, otherwise we will hang
pending_connections_t copy = pending_connections;
for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
s->bind (p->first.c_str ());
s->close ();
}
terminating = saveTerminating;
slot_sync.lock ();
if (!starting) {
#ifdef HAVE_FORK

View File

@ -86,7 +86,7 @@ namespace zmq
// (except zmq_close).
// This function is non-blocking.
// terminate must still be called afterwards.
// This function is optional, terminate will unblock any current
// This function is optional, terminate will unblock any current
// operations as well.
int shutdown();
@ -98,7 +98,7 @@ namespace zmq
zmq::socket_base_t *create_socket (int type_);
void destroy_socket (zmq::socket_base_t *socket_);
// Start a new thread with proper scheduling parameters.
// Start a new thread with proper scheduling parameters.
void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const;
// Send command to the destination thread.
@ -203,7 +203,7 @@ namespace zmq
// Is IPv6 enabled on this context?
bool ipv6;
// Thread scheduling parameters.
// Thread scheduling parameters.
int thread_priority;
int thread_sched_policy;

View File

@ -107,7 +107,7 @@ namespace zmq
// Cookie received from server
uint8_t cn_cookie [16 + 80];
// Intermediary buffer used to seepd up boxing and unboxing.
// Intermediary buffer used to speed up boxing and unboxing.
uint8_t cn_precom [crypto_box_BEFORENMBYTES];
// Nonce

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "dealer.hpp"
#include "err.hpp"
#include "msg.hpp"
@ -44,8 +45,7 @@ zmq::dealer_t::~dealer_t ()
void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void) subscribe_to_all_;
LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_);

View File

@ -53,7 +53,7 @@ namespace zmq
// This class implements the state machine that parses the incoming buffer.
// Derived class should implement individual state machine actions.
//
// Buffer managment is done by an allocator policy.
// Buffer management is done by an allocator policy.
template <typename T, typename A = c_single_allocator>
class decoder_base_t : public i_decoder
{
@ -99,11 +99,11 @@ namespace zmq
}
// Processes the data in the buffer previously allocated using
// get_buffer function. size_ argument specifies nemuber of bytes
// get_buffer function. size_ argument specifies number of bytes
// actually filled into the buffer. Function returns 1 when the
// whole message was decoded or 0 when more data is required.
// On error, -1 is returned and errno set accordingly.
// Number of bytes processed is returned in byts_used_.
// Number of bytes processed is returned in bytes_used_.
int decode (const unsigned char *data_, std::size_t size_,
std::size_t &bytes_used_)
{

View File

@ -96,10 +96,11 @@ unsigned char* zmq::shared_message_memory_allocator::allocate ()
void zmq::shared_message_memory_allocator::deallocate ()
{
std::free (buf);
buf = NULL;
bufsize = 0;
msg_refcnt = NULL;
zmq::atomic_counter_t* c = reinterpret_cast<zmq::atomic_counter_t* >(buf);
if (buf && !c->sub(1)) {
std::free(buf);
}
release();
}
unsigned char* zmq::shared_message_memory_allocator::release ()

View File

@ -80,7 +80,7 @@ namespace zmq
c_single_allocator& operator = (c_single_allocator const&);
};
// This allocater allocates a reference counted buffer which is used by v2_decoder_t
// This allocator allocates a reference counted buffer which is used by v2_decoder_t
// to use zero-copy msg::init_data to create messages with memory from this buffer as
// data storage.
//
@ -102,7 +102,7 @@ namespace zmq
// Allocate a new buffer
//
// This releases the current buffer to be bound to the lifetime of the messages
// created on this bufer.
// created on this buffer.
unsigned char* allocate ();
// force deallocation of buffer.

View File

@ -148,7 +148,7 @@ int zmq::dist_t::send_to_matching (msg_t *msg_)
// Push the message to matching pipes.
distribute (msg_);
// If mutlipart message is fully sent, activate all the eligible pipes.
// If multipart message is fully sent, activate all the eligible pipes.
if (!msg_more)
active = eligible;

View File

@ -37,6 +37,7 @@
#include <algorithm>
#include <new>
#include "macros.hpp"
#include "epoll.hpp"
#include "err.hpp"
#include "config.hpp"
@ -56,8 +57,9 @@ zmq::epoll_t::~epoll_t ()
worker.stop ();
close (epoll_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it)
delete *it;
for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) {
LIBZMQ_DELETE(*it);
}
}
zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
@ -177,9 +179,9 @@ void zmq::epoll_t::loop ()
}
// Destroy retired event sources.
for (retired_t::iterator it = retired.begin (); it != retired.end ();
++it)
delete *it;
for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) {
LIBZMQ_DELETE(*it);
}
retired.clear ();
}
}

View File

@ -89,12 +89,12 @@ void zmq::zmq_abort(const char *errmsg_)
const char *zmq::wsa_error()
{
int no = WSAGetLastError ();
const int last_error = WSAGetLastError();
// TODO: This is not a generic way to handle this...
if (no == WSAEWOULDBLOCK)
if (last_error == WSAEWOULDBLOCK)
return NULL;
return wsa_error_no (no);
return wsa_error_no (last_error);
}
const char *zmq::wsa_error_no (int no_)

View File

@ -80,7 +80,7 @@ namespace zmq
// there are following parts still waiting in the current pipe.
bool more;
// Holds credential after the last_acive_pipe has terminated.
// Holds credential after the last_active_pipe has terminated.
blob_t saved_credential;
fq_t (const fq_t&);

View File

@ -178,7 +178,7 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_)
const uint8_t flags = static_cast <char *> (plaintext.value)[0];
if (flags & 0x01)
msg_->set_flags (msg_t::more);
msg_->set_flags (msg_t::more);
if (flags & 0x02)
msg_->set_flags (msg_t::command);

View File

@ -34,7 +34,7 @@
#ifdef HAVE_LIBGSSAPI_KRB5
#ifndef ZMQ_HAVE_FREEBSD
#if !defined(ZMQ_HAVE_FREEBSD) && !defined(ZMQ_HAVE_DRAGONFLY)
#include <gssapi/gssapi_generic.h>
#endif
#include <gssapi/gssapi_krb5.h>

View File

@ -123,7 +123,7 @@ int zmq::gssapi_server_t::process_handshake_command (msg_t *msg_)
}
if (security_context_established) {
// Use ZAP protocol (RFC 27) to authenticate the user.
// Use ZAP protocol (RFC 27) to authenticate the user.
bool expecting_zap_reply = false;
int rc = session->zap_connect ();
if (rc == 0) {

View File

@ -85,8 +85,8 @@ namespace zmq
void accept_context ();
int produce_next_token (msg_t *msg_);
int process_next_token (msg_t *msg_);
void send_zap_request ();
int receive_and_process_zap_reply();
void send_zap_request ();
int receive_and_process_zap_reply();
};
}

View File

@ -49,7 +49,7 @@ namespace zmq
virtual void resize_buffer(size_t) = 0;
// Decodes data pointed to by data_.
// When a message is decoded, 1 is returned.
// When the decoder needs more data, 0 is returnd.
// When the decoder needs more data, 0 is returned.
// On error, -1 is returned and errno is set accordingly.
virtual int decode (const unsigned char *data_, size_t size_,
size_t &processed) = 0;

View File

@ -29,6 +29,7 @@
#include <new>
#include "macros.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
@ -46,7 +47,7 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
zmq::io_thread_t::~io_thread_t ()
{
delete poller;
LIBZMQ_DELETE(poller);
}
void zmq::io_thread_t::start ()

View File

@ -52,7 +52,7 @@ namespace zmq
io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// Clean-up. If the thread was started, it's necessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
~io_thread_t ();
@ -70,7 +70,7 @@ namespace zmq
void out_event ();
void timer_event (int id_);
// Used by io_objects to retrieve the assciated poller object.
// Used by io_objects to retrieve the associated poller object.
poller_t *get_poller ();
// Command handlers.

View File

@ -132,10 +132,11 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
rc = getpeername (sockfd_, (struct sockaddr*) &ss, &addrlen);
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
wsa_assert (WSAGetLastError () != WSANOTINITIALISED &&
WSAGetLastError () != WSAEFAULT &&
WSAGetLastError () != WSAEINPROGRESS &&
WSAGetLastError () != WSAENOTSOCK);
const int last_error = WSAGetLastError();
wsa_assert (last_error != WSANOTINITIALISED &&
last_error != WSAEFAULT &&
last_error != WSAEINPROGRESS &&
last_error != WSAENOTSOCK);
return 0;
}
#else

View File

@ -103,7 +103,7 @@ void zmq::ipc_connecter_t::process_term (int linger_)
void zmq::ipc_connecter_t::in_event ()
{
// We are not polling for incomming data, so we are actually called
// We are not polling for incoming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event ();
@ -216,7 +216,7 @@ int zmq::ipc_connecter_t::open ()
s, addr->resolved.ipc_addr->addr (),
addr->resolved.ipc_addr->addrlen ());
// Connect was successfull immediately.
// Connect was successful immediately.
if (rc == 0)
return 0;

View File

@ -83,7 +83,7 @@ namespace zmq
int get_new_reconnect_ivl ();
// Open IPC connecting socket. Returns -1 in case of error,
// 0 if connect was successfull immediately. Returns -1 with
// 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched.
int open ();
@ -91,7 +91,7 @@ namespace zmq
int close ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessfull.
// retired_fd if the connection was unsuccessful.
fd_t connect ();
// Address to connect to. Owned by session_base_t.

View File

@ -84,7 +84,7 @@ namespace zmq
// if the connection was dropped while waiting in the listen backlog.
fd_t accept ();
// True, if the undelying file for UNIX domain socket exists.
// True, if the underlying file for UNIX domain socket exists.
bool has_file;
// Name of the file associated with the UNIX domain address.
@ -96,7 +96,7 @@ namespace zmq
// Handle corresponding to the listening socket.
handle_t handle;
// Socket the listerner belongs to.
// Socket the listener belongs to.
zmq::socket_base_t *socket;
// String representation of endpoint to bind to

View File

@ -38,6 +38,7 @@
#include <algorithm>
#include <new>
#include "macros.hpp"
#include "kqueue.hpp"
#include "err.hpp"
#include "config.hpp"
@ -210,9 +211,9 @@ void zmq::kqueue_t::loop ()
}
// Destroy retired event sources.
for (retired_t::iterator it = retired.begin (); it != retired.end ();
++it)
delete *it;
for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) {
LIBZMQ_DELETE(*it);
}
retired.clear ();
}
}

View File

@ -107,7 +107,17 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
break;
}
zmq_assert (!more);
// If send fails for multi-part msg rollback other
// parts sent earlier and return EAGAIN.
// Application should handle this as suitable
if (more)
{
pipes [current]->rollback ();
more = 0;
errno = EAGAIN;
return -1;
}
active--;
if (current < active)
pipes.swap (current, active);

12
src/macros.hpp Normal file
View File

@ -0,0 +1,12 @@
/******************************************************************************/
/* 0MQ Internal Use */
/******************************************************************************/
#define LIBZMQ_UNUSED(object) (void)object
#define LIBZMQ_DELETE(p_object) {\
delete p_object; \
p_object = 0; \
}
/******************************************************************************/

View File

@ -77,14 +77,18 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
}
// Wait for signal from the command sender.
const int rc = signaler.wait (timeout_);
int rc = signaler.wait (timeout_);
if (rc == -1) {
errno_assert (errno == EAGAIN || errno == EINTR);
return -1;
}
// Receive the signal.
signaler.recv ();
rc = signaler.recv_failable ();
if (rc == -1) {
errno_assert (errno == EAGAIN);
return -1;
}
// Switch into active state.
active = true;

View File

@ -74,7 +74,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
{
static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP",
"DEALER", "ROUTER", "PULL", "PUSH",
"XPUB", "XSUB", "STREAM", "SERVER", "CLIENT"};
"XPUB", "XSUB", "STREAM",
"SERVER", "CLIENT"};
zmq_assert (socket_type >= 0 && socket_type <= 13);
return names [socket_type];
}
@ -190,7 +191,7 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
case ZMQ_SERVER:
return type_ == "CLIENT";
case ZMQ_CLIENT:
return type_ == "CLIENT" || type_ == "SERVER";
return type_ == "SERVER";
default:
break;
}

View File

@ -39,7 +39,7 @@ namespace zmq
{
// Abstract class representing security mechanism.
// Different mechanism extedns this class.
// Different mechanism extends this class.
class msg_t;

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "msg.hpp"
#include "../include/zmq.h"
@ -235,9 +236,12 @@ int zmq::msg_t::close ()
}
}
if (u.base.metadata != NULL)
if (u.base.metadata->drop_ref ())
delete u.base.metadata;
if (u.base.metadata != NULL) {
if (u.base.metadata->drop_ref ()) {
LIBZMQ_DELETE(u.base.metadata);
}
u.base.metadata = NULL;
}
// Make the message invalid.
u.base.type = 0;
@ -391,8 +395,9 @@ void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
void zmq::msg_t::reset_metadata ()
{
if (u.base.metadata) {
if (u.base.metadata->drop_ref ())
delete u.base.metadata;
if (u.base.metadata->drop_ref ()) {
LIBZMQ_DELETE(u.base.metadata);
}
u.base.metadata = NULL;
}
}
@ -492,18 +497,22 @@ bool zmq::msg_t::rm_refs (int refs_)
return true;
}
uint32_t zmq::msg_t::get_routing_id()
uint32_t zmq::msg_t::get_routing_id ()
{
return u.base.routing_id;
}
int zmq::msg_t::set_routing_id(uint32_t routing_id_)
int zmq::msg_t::set_routing_id (uint32_t routing_id_)
{
u.base.routing_id = routing_id_;
return 0;
if (routing_id_) {
u.base.routing_id = routing_id_;
return 0;
}
errno = EINVAL;
return -1;
}
zmq::atomic_counter_t* zmq::msg_t::refcnt()
zmq::atomic_counter_t *zmq::msg_t::refcnt()
{
switch(u.base.type)
{

View File

@ -97,8 +97,8 @@ namespace zmq
bool is_vsm () const;
bool is_cmsg () const;
bool is_zcmsg() const;
uint32_t get_routing_id();
int set_routing_id(uint32_t routing_id_);
uint32_t get_routing_id ();
int set_routing_id (uint32_t routing_id_);
// After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy.

View File

@ -32,6 +32,7 @@
#include <new>
#include <algorithm>
#include "macros.hpp"
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
@ -52,19 +53,17 @@ zmq::mtrie_t::mtrie_t () :
zmq::mtrie_t::~mtrie_t ()
{
if (pipes) {
delete pipes;
pipes = 0;
LIBZMQ_DELETE(pipes);
}
if (count == 1) {
zmq_assert (next.node);
delete next.node;
next.node = 0;
LIBZMQ_DELETE(next.node);
}
else
if (count > 1) {
for (unsigned short i = 0; i != count; ++i)
delete next.table [i];
else if (count > 1) {
for (unsigned short i = 0; i != count; ++i) {
LIBZMQ_DELETE(next.table[i]);
}
free (next.table);
}
}
@ -92,13 +91,13 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
if (c < min || c >= min + count) {
// The character is out of range of currently handled
// charcters. We have to extend the table.
// characters. We have to extend the table.
if (!count) {
min = c;
count = 1;
next.node = NULL;
}
else
else
if (count == 1) {
unsigned char oldc = min;
mtrie_t *oldp = next.node;
@ -111,7 +110,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
min = std::min (min, c);
next.table [oldc - min] = oldp;
}
else
else
if (min < c) {
// The new character is above the current character range.
unsigned short old_count = count;
@ -178,8 +177,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
}
if (pipes->empty ()) {
delete pipes;
pipes = 0;
LIBZMQ_DELETE(pipes);
}
}
@ -203,8 +201,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
// Prune the node if it was made redundant by the removal
if (next.node->is_redundant ()) {
delete next.node;
next.node = 0;
LIBZMQ_DELETE(next.node);
count = 0;
--live_nodes;
zmq_assert (live_nodes == 0);
@ -226,8 +223,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
// Prune redundant nodes from the mtrie
if (next.table [c]->is_redundant ()) {
delete next.table [c];
next.table [c] = 0;
LIBZMQ_DELETE(next.table[c]);
zmq_assert (live_nodes > 0);
--live_nodes;
@ -257,7 +253,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
count = 0;
}
// Compact the node table if possible
else
else
if (live_nodes == 1) {
// If there's only one live node in the table we can
// switch to using the more compact single-node
@ -306,8 +302,7 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
pipes_t::size_type erased = pipes->erase (pipe_);
zmq_assert (erased == 1);
if (pipes->empty ()) {
delete pipes;
pipes = 0;
LIBZMQ_DELETE(pipes);
}
}
return !pipes;
@ -326,7 +321,7 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);
if (next_node->is_redundant ()) {
delete next_node;
LIBZMQ_DELETE(next_node);
zmq_assert (count > 0);
if (count == 1) {
@ -418,16 +413,16 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
break;
// If there's one subnode (optimisation).
if (current->count == 1) {
if (current->count == 1) {
if (data_ [0] != current->min)
break;
current = current->next.node;
data_++;
size_--;
continue;
}
continue;
}
// If there are multiple subnodes.
// If there are multiple subnodes.
if (data_ [0] < current->min || data_ [0] >=
current->min + current->count)
break;

View File

@ -167,7 +167,7 @@ namespace zmq
bool tx_more_bit;
bool zmq_output_ready; // zmq has msg(s) to send
bool norm_tx_ready; // norm has tx queue vacancy
// tbd - maybe don't need buffer if can access zmq message buffer directly?
// TBD - maybe don't need buffer if can access zmq message buffer directly?
char tx_buffer[BUFFER_SIZE];
unsigned int tx_index;
unsigned int tx_len;

View File

@ -110,7 +110,7 @@ namespace zmq
void send_reaped ();
void send_done ();
// These handlers can be overrided by the derived objects. They are
// These handlers can be overridden by the derived objects. They are
// called when command arrives from another thread.
virtual void process_stop ();
virtual void process_plug ();

View File

@ -46,6 +46,8 @@ zmq::options_t::options_t () :
tos (0),
type (-1),
linger (-1),
connect_timeout (0),
tcp_retransmit_timeout (0),
reconnect_ivl (100),
reconnect_ivl_max (0),
backlog (100),
@ -158,6 +160,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
break;
case ZMQ_CONNECT_TIMEOUT:
if (is_int && value >= 0) {
connect_timeout = value;
return 0;
}
break;
case ZMQ_TCP_RETRANSMIT_TIMEOUT:
if (is_int && value >= 0) {
tcp_retransmit_timeout = value;
return 0;
}
break;
case ZMQ_RECONNECT_IVL:
if (is_int && value >= -1) {
reconnect_ivl = value;
@ -653,6 +669,20 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
break;
case ZMQ_CONNECT_TIMEOUT:
if (is_int) {
*value = connect_timeout;
return 0;
}
break;
case ZMQ_TCP_RETRANSMIT_TIMEOUT:
if (is_int) {
*value = tcp_retransmit_timeout;
return 0;
}
break;
case ZMQ_RECONNECT_IVL:
if (is_int) {
*value = reconnect_ivl;

View File

@ -92,6 +92,16 @@ namespace zmq
// Linger time, in milliseconds.
int linger;
// Maximum interval in milliseconds beyond which userspace will
// timeout connect().
// Default 0 (unused)
int connect_timeout;
// Maximum interval in milliseconds beyond which TCP will timeout
// retransmitted packets.
// Default 0 (unused)
int tcp_retransmit_timeout;
// Minimum interval between attempts to reconnect, in milliseconds.
// Default 100ms
int reconnect_ivl;
@ -132,7 +142,7 @@ namespace zmq
bool raw_socket;
bool raw_notify; // Provide connect notifications
// Addres of SOCKS proxy
// Address of SOCKS proxy
std::string socks_proxy_address;
// TCP keep-alive settings.

View File

@ -138,7 +138,7 @@ void zmq::own_t::terminate ()
if (terminating)
return;
// As for the root of the ownership tree, there's noone to terminate it,
// As for the root of the ownership tree, there's no one to terminate it,
// so it has to terminate itself.
if (!owner) {
process_term (options.linger);

View File

@ -96,12 +96,12 @@ namespace zmq
// specific type of the owned object correctly.
virtual ~own_t ();
// Term handler is protocted rather than private so that it can
// Term handler is protected rather than private so that it can
// be intercepted by the derived class. This is useful to add custom
// steps to the beginning of the termination process.
void process_term (int linger_);
// A place to hook in when phyicallal destruction of the object
// A place to hook in when physical destruction of the object
// is to be delayed.
virtual void process_destroy ();
@ -119,7 +119,7 @@ namespace zmq
void process_term_ack ();
void process_seqnum ();
// Check whether all the peding term acks were delivered.
// Check whether all the pending term acks were delivered.
// If so, deallocate this object.
void check_term_acks ();

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "pair.hpp"
#include "err.hpp"
#include "pipe.hpp"
@ -47,8 +48,7 @@ zmq::pair_t::~pair_t ()
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void)subscribe_to_all_;
LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_ != NULL);

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM
@ -89,8 +90,9 @@ void zmq::pgm_receiver_t::unplug ()
{
// Delete decoders.
for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
if (it->second.decoder != NULL)
delete it->second.decoder;
if (it->second.decoder != NULL) {
LIBZMQ_DELETE(it->second.decoder);
}
}
peers.clear ();
active_tsi = NULL;
@ -141,8 +143,7 @@ void zmq::pgm_receiver_t::restart_input ()
// Data error. Delete message decoder, mark the
// peer as not joined and drop remaining data.
it->second.joined = false;
delete it->second.decoder;
it->second.decoder = NULL;
LIBZMQ_DELETE(it->second.decoder);
insize = 0;
}
}
@ -194,8 +195,7 @@ void zmq::pgm_receiver_t::in_event ()
if (it != peers.end ()) {
it->second.joined = false;
if (it->second.decoder != NULL) {
delete it->second.decoder;
it->second.decoder = NULL;
LIBZMQ_DELETE(it->second.decoder);
}
}
break;
@ -226,7 +226,7 @@ void zmq::pgm_receiver_t::in_event ()
zmq_assert (offset <= insize);
zmq_assert (it->second.decoder == NULL);
// We have to move data to the begining of the first message.
// We have to move data to the beginning of the first message.
inpos += offset;
insize -= offset;
@ -252,8 +252,7 @@ void zmq::pgm_receiver_t::in_event ()
}
it->second.joined = false;
delete it->second.decoder;
it->second.decoder = NULL;
LIBZMQ_DELETE(it->second.decoder);
insize = 0;
}
}

View File

@ -44,7 +44,7 @@
#include "wire.hpp"
#include "stdint.hpp"
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
has_tx_timer (false),
@ -77,7 +77,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
{
// Alocate 2 fds for PGM socket.
// Allocate 2 fds for PGM socket.
fd_t downlink_socket_fd = retired_fd;
fd_t uplink_socket_fd = retired_fd;
fd_t rdata_notify_fd = retired_fd;
@ -91,11 +91,11 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
handle = add_fd (downlink_socket_fd);
uplink_handle = add_fd (uplink_socket_fd);
rdata_notify_handle = add_fd (rdata_notify_fd);
rdata_notify_handle = add_fd (rdata_notify_fd);
pending_notify_handle = add_fd (pending_notify_fd);
// Set POLLIN. We wont never want to stop polling for uplink = we never
// want to stop porocess NAKs.
// want to stop processing NAKs.
set_pollin (uplink_handle);
set_pollin (rdata_notify_handle);
set_pollin (pending_notify_handle);
@ -169,11 +169,11 @@ void zmq::pgm_sender_t::in_event ()
void zmq::pgm_sender_t::out_event ()
{
// POLLOUT event from send socket. If write buffer is empty,
// POLLOUT event from send socket. If write buffer is empty,
// try to read new data from the encoder.
if (write_size == 0) {
// First two bytes (sizeof uint16_t) are used to store message
// First two bytes (sizeof uint16_t) are used to store message
// offset in following steps. Note that by passing our buffer to
// the get data function we prevent it from returning its own buffer.
unsigned char *bf = out_buffer + sizeof (uint16_t);

View File

@ -82,7 +82,7 @@ int zmq::pgm_socket_t::init_address (const char *network_,
}
*port_number = atoi (port_delim + 1);
char network [256];
if (port_delim - network_ >= (int) sizeof (network) - 1) {
errno = EINVAL;
@ -195,24 +195,24 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
}
{
const int rcvbuf = (int) options.rcvbuf;
if (rcvbuf >= 0) {
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
sizeof (rcvbuf)))
goto err_abort;
}
const int rcvbuf = (int) options.rcvbuf;
if (rcvbuf >= 0) {
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
sizeof (rcvbuf)))
goto err_abort;
}
const int sndbuf = (int) options.sndbuf;
if (sndbuf >= 0) {
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
sizeof (sndbuf)))
goto err_abort;
}
const int sndbuf = (int) options.sndbuf;
if (sndbuf >= 0) {
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
sizeof (sndbuf)))
goto err_abort;
}
const int max_tpdu = (int) pgm_max_tpdu;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
sizeof (max_tpdu)))
goto err_abort;
const int max_tpdu = (int) pgm_max_tpdu;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
sizeof (max_tpdu)))
goto err_abort;
}
if (receiver) {
@ -334,28 +334,28 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
// Set IP level parameters.
{
// Multicast loopback disabled by default
const int multicast_loop = 0;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
&multicast_loop, sizeof (multicast_loop)))
goto err_abort;
// Multicast loopback disabled by default
const int multicast_loop = 0;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
&multicast_loop, sizeof (multicast_loop)))
goto err_abort;
const int multicast_hops = options.multicast_hops;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
&multicast_hops, sizeof (multicast_hops)))
goto err_abort;
const int multicast_hops = options.multicast_hops;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
&multicast_hops, sizeof (multicast_hops)))
goto err_abort;
// Expedited Forwarding PHB for network elements, no ECN.
// Ignore return value due to varied runtime support.
const int dscp = 0x2e << 2;
if (AF_INET6 != sa_family)
pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS,
&dscp, sizeof (dscp));
// Expedited Forwarding PHB for network elements, no ECN.
// Ignore return value due to varied runtime support.
const int dscp = 0x2e << 2;
if (AF_INET6 != sa_family)
pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS,
&dscp, sizeof (dscp));
const int nonblocking = 1;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
&nonblocking, sizeof (nonblocking)))
goto err_abort;
const int nonblocking = 1;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
&nonblocking, sizeof (nonblocking)))
goto err_abort;
}
// Connect PGM transport to start state machine.
@ -402,13 +402,13 @@ zmq::pgm_socket_t::~pgm_socket_t ()
{
if (pgm_msgv)
free (pgm_msgv);
if (sock)
if (sock)
pgm_close (sock, TRUE);
}
// Get receiver fds. receive_fd_ is signaled for incoming packets,
// waiting_pipe_fd_ is signaled for state driven events and data.
void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
fd_t *waiting_pipe_fd_)
{
socklen_t socklen;
@ -430,12 +430,12 @@ void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
}
// Get fds and store them into user allocated memory.
// Get fds and store them into user allocated memory.
// send_fd is for non-blocking send wire notifications.
// receive_fd_ is for incoming back-channel protocol packets.
// rdata_notify_fd_ is raised for waiting repair transmissions.
// pending_notify_fd_ is for state driven events.
void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_)
{
socklen_t socklen;
@ -475,7 +475,7 @@ void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{
size_t nbytes = 0;
const int status = pgm_send (sock, data_, data_len_, &nbytes);
// We have to write all data as one packet.
@ -551,7 +551,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
{
size_t raw_data_len = 0;
// We just sent all data from pgm_transport_recvmsgv up
// We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled.
if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
@ -572,7 +572,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (nbytes_processed == 0);
zmq_assert (nbytes_rec == 0);
// Receive a vector of Application Protocol Domain Unit's (APDUs)
// Receive a vector of Application Protocol Domain Unit's (APDUs)
// from the transport.
pgm_error_t *pgm_error = NULL;
@ -590,7 +590,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (nbytes_rec == 0);
// In case if no RDATA/ODATA caused POLLIN 0 is
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
errno = EBUSY;
@ -646,8 +646,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Only one APDU per pgm_msgv_t structure is allowed.
zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
struct pgm_sk_buff_t* skb =
struct pgm_sk_buff_t* skb =
pgm_msgv [pgm_msgv_processed].msgv_skb [0];
// Take pointers from pgm_msgv_t structure.
@ -679,7 +679,7 @@ void zmq::pgm_socket_t::process_upstream ()
zmq_assert (status != PGM_IO_STATUS_ERROR);
// No data should be returned.
zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
status == PGM_IO_STATUS_RATE_LIMITED ||
status == PGM_IO_STATUS_WOULD_BLOCK));
@ -698,7 +698,7 @@ int zmq::pgm_socket_t::compute_sqns (int tpdu_)
{
// Convert rate into B/ms.
uint64_t rate = uint64_t (options.rate) / 8;
// Compute the size of the buffer in bytes.
uint64_t size = uint64_t (options.recovery_ivl) * rate;

View File

@ -30,6 +30,7 @@
#include <new>
#include <stddef.h>
#include "macros.hpp"
#include "pipe.hpp"
#include "err.hpp"
@ -205,7 +206,7 @@ bool zmq::pipe_t::check_write ()
if (unlikely (!out_active || state != active))
return false;
bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
bool full = !check_hwm();
if (unlikely (full)) {
out_active = false;
@ -262,7 +263,7 @@ void zmq::pipe_t::process_activate_read ()
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
{
// Remember the peers's message sequence number.
// Remember the peer's message sequence number.
peers_msgs_read = msgs_read_;
if (!out_active && state == active) {
@ -284,7 +285,7 @@ void zmq::pipe_t::process_hiccup (void *pipe_)
int rc = msg.close ();
errno_assert (rc == 0);
}
delete outpipe;
LIBZMQ_DELETE(outpipe);
// Plug in the new outpipe.
zmq_assert (pipe_);
@ -368,7 +369,7 @@ void zmq::pipe_t::process_pipe_term_ack ()
}
}
delete inpipe;
LIBZMQ_DELETE(inpipe);
// Deallocate the pipe object
delete this;
@ -384,50 +385,42 @@ void zmq::pipe_t::terminate (bool delay_)
// Overload the value specified at pipe creation.
delay = delay_;
// If terminate was already called, we can ignore the duplicit invocation.
if (state == term_req_sent1 || state == term_req_sent2)
// If terminate was already called, we can ignore the duplicate invocation.
if (state == term_req_sent1 || state == term_req_sent2) {
return;
}
// If the pipe is in the final phase of async termination, it's going to
// closed anyway. No need to do anything special here.
else
if (state == term_ack_sent)
else if (state == term_ack_sent) {
return;
}
// The simple sync termination case. Ask the peer to terminate and wait
// for the ack.
else
if (state == active) {
else if (state == active) {
send_pipe_term (peer);
state = term_req_sent1;
}
// There are still pending messages available, but the user calls
// 'terminate'. We can act as if all the pending messages were read.
else
if (state == waiting_for_delimiter && !delay) {
else if (state == waiting_for_delimiter && !delay) {
outpipe = NULL;
send_pipe_term_ack (peer);
state = term_ack_sent;
}
// If there are pending messages still available, do nothing.
else
if (state == waiting_for_delimiter) {
else if (state == waiting_for_delimiter) {
}
// We've already got delimiter, but not term command yet. We can ignore
// the delimiter and ack synchronously terminate as if we were in
// active state.
else
if (state == delimiter_received) {
else if (state == delimiter_received) {
send_pipe_term (peer);
state = term_req_sent1;
}
// There are no other states.
else
else {
zmq_assert (false);
}
// Stop outbound flow of messages.
out_active = false;
@ -505,11 +498,9 @@ void zmq::pipe_t::hiccup ()
// Create new inpipe.
if (conflate)
inpipe = new (std::nothrow)
ypipe_conflate_t <msg_t> ();
inpipe = new (std::nothrow)ypipe_conflate_t <msg_t>();
else
inpipe = new (std::nothrow)
ypipe_t <msg_t, message_pipe_granularity> ();
inpipe = new (std::nothrow)ypipe_t <msg_t, message_pipe_granularity>();
alloc_assert (inpipe);
in_active = true;
@ -525,13 +516,13 @@ void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
// if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
if (inhwm_ <= 0 || inhwmboost <= 0)
in = 0;
if (outhwm_ <= 0 || outhwmboost <= 0)
out = 0;
in = 0;
lwm = compute_lwm(in);
hwm = out;
if (outhwm_ <= 0 || outhwmboost <= 0)
out = 0;
lwm = compute_lwm(in);
hwm = out;
}
void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
@ -542,6 +533,6 @@ void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
bool zmq::pipe_t::check_hwm () const
{
bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm - 1);
bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm);
return( !full );
}

View File

@ -85,8 +85,8 @@ namespace zmq
void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an routing ID to be used by its clients.
void set_routing_id(uint32_t routing_id_);
uint32_t get_routing_id();
void set_routing_id (uint32_t routing_id_);
uint32_t get_routing_id ();
// Pipe endpoint can store an opaque ID to be used by its clients.
void set_identity (const blob_t &identity_);
@ -121,7 +121,7 @@ namespace zmq
// in the peer.
void hiccup ();
// Ensure the pipe wont block on receiving pipe_term.
// Ensure the pipe won't block on receiving pipe_term.
void set_nodelay ();
// Ask pipe to terminate. The termination will happen asynchronously
@ -130,13 +130,13 @@ namespace zmq
// before actual shutdown.
void terminate (bool delay_);
// set the high water marks.
// Set the high water marks.
void set_hwms (int inhwm_, int outhwm_);
// set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
// Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
void set_hwms_boost(int inhwmboost_, int outhwmboost_);
// check HWM
// Returns true if HWM is not reached
bool check_hwm () const;
private:

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "pull.hpp"
#include "err.hpp"
#include "msg.hpp"
@ -44,8 +45,7 @@ zmq::pull_t::~pull_t ()
void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void)subscribe_to_all_;
LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_);
fq.attach (pipe_);

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "push.hpp"
#include "pipe.hpp"
#include "err.hpp"
@ -44,8 +45,8 @@ zmq::push_t::~push_t ()
void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void)subscribe_to_all_;
LIBZMQ_UNUSED (subscribe_to_all_);
// Don't delay pipe termination as there is no one
// to receive the delimiter.
pipe_->set_nodelay ();

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "reaper.hpp"
#include "socket_base.hpp"
#include "err.hpp"
@ -39,8 +40,10 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
if (mailbox.get_fd () != retired_fd) {
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
}
#ifdef HAVE_FORK
pid = getpid();
@ -49,7 +52,7 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
zmq::reaper_t::~reaper_t ()
{
delete poller;
LIBZMQ_DELETE(poller);
}
zmq::mailbox_t *zmq::reaper_t::get_mailbox ()

View File

@ -95,7 +95,7 @@ int zmq::req_t::xsend (msg_t *msg_)
message_begins = false;
// Eat all currently avaliable messages before the request is fully
// Eat all currently available messages before the request is fully
// sent. This is done to avoid:
// REQ sends request to A, A replies, B replies too.
// A's reply was first and matches, that is used.

View File

@ -65,7 +65,7 @@ namespace zmq
private:
// If true, request was already sent and reply wasn't received yet or
// was raceived partially.
// was received partially.
bool receiving_reply;
// If true, we are starting to send/recv a message. The first part

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "router.hpp"
#include "pipe.hpp"
#include "wire.hpp"
@ -44,7 +45,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_rid (generate_random ()),
mandatory (false),
// raw_socket functionality in ROUTER is deprecated
raw_socket (false),
raw_socket (false),
probe_router (false),
handover (false)
{
@ -66,8 +67,7 @@ zmq::router_t::~router_t ()
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void)subscribe_to_all_;
LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_);
@ -104,6 +104,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
return 0;
}
break;
case ZMQ_ROUTER_RAW:
if (is_int && value >= 0) {
raw_socket = (value != 0);
@ -128,8 +129,8 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
return 0;
}
break;
case ZMQ_ROUTER_HANDOVER:
case ZMQ_ROUTER_HANDOVER:
if (is_int && value >= 0) {
handover = (value != 0);
return 0;
@ -327,6 +328,8 @@ int zmq::router_t::xrecv (msg_t *msg_)
errno_assert (rc == 0);
memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more);
if (prefetched_msg.metadata())
msg_->set_metadata(prefetched_msg.metadata());
identity_sent = true;
}
@ -386,7 +389,7 @@ bool zmq::router_t::xhas_in ()
bool zmq::router_t::xhas_out ()
{
// In theory, ROUTER socket is always ready for writing. Whether actual
// attempt to write succeeds depends on whitch pipe the message is going
// attempt to write succeeds depends on which pipe the message is going
// to be routed to.
return true;
}
@ -407,10 +410,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
connect_rid.length());
connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ())
if (it != outpipes.end ())
zmq_assert(false); // Not allowed to duplicate an existing rid
}
else
else
if (options.raw_socket) { // Always assign identity for raw-socket
unsigned char buf [5];
buf [0] = 0;
@ -418,7 +421,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
identity = blob_t (buf, sizeof buf);
}
else
if (!options.raw_socket) {
if (!options.raw_socket) {
// Pick up handshake cases and also case where next identity is set
msg.init ();
ok = pipe_->read (&msg);
@ -444,7 +447,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
return false;
else {
// We will allow the new connection to take over this
// identity. Temporarily assign a new identity to the
// identity. Temporarily assign a new identity to the
// existing pipe so we can terminate it asynchronously.
unsigned char buf [5];
buf [0] = 0;
@ -452,13 +455,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
blob_t new_identity = blob_t (buf, sizeof buf);
it->second.pipe->set_identity (new_identity);
outpipe_t existing_outpipe =
outpipe_t existing_outpipe =
{it->second.pipe, it->second.active};
ok = outpipes.insert (outpipes_t::value_type (
new_identity, existing_outpipe)).second;
zmq_assert (ok);
// Remove the existing identity entry to allow the new
// connection to take the identity.
outpipes.erase (it);

View File

@ -102,7 +102,7 @@ namespace zmq
// Checks if an fd_entry_t is retired.
static bool is_retired_fd (const fd_entry_t &entry);
// Set of file descriptors that are used to retreive
// Set of file descriptors that are used to retrieve
// information for fd_set.
typedef std::vector <fd_entry_t> fd_set_t;
fd_set_t fds;

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "server.hpp"
#include "pipe.hpp"
#include "wire.hpp"
@ -38,29 +39,31 @@ zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_, true),
next_rid (generate_random ())
{
options.type = ZMQ_SERVER;
options.type = ZMQ_SERVER;
}
zmq::server_t::~server_t ()
{
{
zmq_assert (outpipes.empty ());
}
void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void)subscribe_to_all_;
LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_);
zmq_assert (pipe_);
uint32_t routing_id = next_rid++;
if (!routing_id)
routing_id = next_rid++; // Never use RID zero
pipe_->set_routing_id (routing_id);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
zmq_assert (ok);
fq.attach (pipe_);
fq.attach (pipe_);
}
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
@ -68,12 +71,12 @@ void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
zmq_assert (it != outpipes.end ());
outpipes.erase (it);
fq.pipe_terminated (pipe_);
fq.pipe_terminated (pipe_);
}
void zmq::server_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
{
fq.activated (pipe_);
}
void zmq::server_t::xwrite_activated (pipe_t *pipe_)
@ -90,20 +93,18 @@ void zmq::server_t::xwrite_activated (pipe_t *pipe_)
int zmq::server_t::xsend (msg_t *msg_)
{
zmq_assert(!(msg_->flags () & msg_t::more));
// Find the pipe associated with the routing stored in the message.
uint32_t routing_id = msg_->get_routing_id();
// Find the pipe associated with the routing stored in the message.
uint32_t routing_id = msg_->get_routing_id ();
outpipes_t::iterator it = outpipes.find (routing_id);
if (it != outpipes.end ()) {
if (it != outpipes.end ()) {
if (!it->second.pipe->check_write ()) {
it->second.active = false;
it->second.active = false;
errno = EAGAIN;
return -1;
return -1;
}
}
else {
else {
errno = EHOSTUNREACH;
return -1;
}
@ -113,10 +114,11 @@ int zmq::server_t::xsend (msg_t *msg_)
// Message failed to send - we must close it ourselves.
int rc = msg_->close ();
errno_assert (rc == 0);
} else {
it->second.pipe->flush ();
}
else
it->second.pipe->flush ();
// Detach the message from the data buffer.
int rc = msg_->init ();
errno_assert (rc == 0);
@ -125,7 +127,7 @@ int zmq::server_t::xsend (msg_t *msg_)
}
int zmq::server_t::xrecv (msg_t *msg_)
{
{
pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe);
@ -134,22 +136,22 @@ int zmq::server_t::xrecv (msg_t *msg_)
// drop all frames of the current multi-frame message
rc = fq.recvpipe (msg_, NULL);
while (rc == 0 && msg_->flags () & msg_t::more)
rc = fq.recvpipe (msg_, NULL);
rc = fq.recvpipe (msg_, NULL);
// get the new message
if (rc == 0)
if (rc == 0)
rc = fq.recvpipe (msg_, &pipe);
}
}
if (rc != 0)
return rc;
return rc;
zmq_assert (pipe != NULL);
uint32_t routing_id = pipe->get_routing_id();
msg_->set_routing_id(routing_id);
uint32_t routing_id = pipe->get_routing_id ();
msg_->set_routing_id (routing_id);
return 0;
}
@ -162,7 +164,7 @@ bool zmq::server_t::xhas_in ()
bool zmq::server_t::xhas_out ()
{
// In theory, SERVER socket is always ready for writing. Whether actual
// attempt to write succeeds depends on whitch pipe the message is going
// attempt to write succeeds depends on which pipe the message is going
// to be routed to.
return true;
}

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "session_base.hpp"
#include "i_engine.hpp"
#include "err.hpp"
@ -111,7 +112,7 @@ zmq::session_base_t::~session_base_t ()
if (engine)
engine->terminate ();
delete addr;
LIBZMQ_DELETE(addr);
}
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
@ -460,7 +461,8 @@ void zmq::session_base_t::process_term (int linger_)
// TODO: Should this go into pipe_t::terminate ?
// In case there's no engine and there's only delimiter in the
// pipe it wouldn't be ever read. Thus we check for it explicitly.
pipe->check_read ();
if (!engine)
pipe->check_read ();
}
if (zap_pipe != NULL)

View File

@ -213,7 +213,7 @@ int zmq::signaler_t::wait (int timeout_)
{
#ifdef HAVE_FORK
if (unlikely (pid != getpid ())) {
// we have forked and the file descriptor is closed. Emulate an interupt
// we have forked and the file descriptor is closed. Emulate an interrupt
// response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
errno = EINTR;
@ -238,7 +238,7 @@ int zmq::signaler_t::wait (int timeout_)
#ifdef HAVE_FORK
else
if (unlikely (pid != getpid ())) {
// we have forked and the file descriptor is closed. Emulate an interupt
// we have forked and the file descriptor is closed. Emulate an interrupt
// response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
errno = EINTR;
@ -315,6 +315,58 @@ void zmq::signaler_t::recv ()
#endif
}
int zmq::signaler_t::recv_failable ()
{
// Attempt to read a signal.
#if defined ZMQ_HAVE_EVENTFD
uint64_t dummy;
ssize_t sz = read (r, &dummy, sizeof (dummy));
if (sz == -1) {
errno_assert (errno == EAGAIN);
return -1;
}
else {
errno_assert (sz == sizeof (dummy));
// If we accidentally grabbed the next signal(s) along with the current
// one, return it back to the eventfd object.
if (unlikely (dummy > 1)) {
const uint64_t inc = dummy - 1;
ssize_t sz2 = write (w, &inc, sizeof (inc));
errno_assert (sz2 == sizeof (inc));
return 0;
}
zmq_assert (dummy == 1);
}
#else
unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
if (nbytes == SOCKET_ERROR) {
const int last_error = WSAGetLastError();
if (last_error == WSAEWOULDBLOCK) {
errno = EAGAIN;
return -1;
}
wsa_assert (last_error == WSAEWOULDBLOCK);
}
#else
ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
if (nbytes == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
errno = EAGAIN;
return -1;
}
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
}
#endif
zmq_assert (nbytes == sizeof (dummy));
zmq_assert (dummy == 0);
#endif
return 0;
}
#ifdef HAVE_FORK
void zmq::signaler_t::forked ()
{

View File

@ -55,6 +55,7 @@ namespace zmq
void send ();
int wait (int timeout_);
void recv ();
int recv_failable ();
#ifdef HAVE_FORK
// close the file descriptors in a forked child process so that they
@ -64,7 +65,7 @@ namespace zmq
private:
// Creates a pair of filedescriptors that will be used
// Creates a pair of file descriptors that will be used
// to pass the signals.
static int make_fdpair (fd_t *r_, fd_t *w_);

View File

@ -31,6 +31,7 @@
#include <string>
#include <algorithm>
#include "macros.hpp"
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
@ -156,7 +157,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
if (mailbox != NULL && mailbox->get_fd () == retired_fd) {
s->destroyed = true;
delete s;
LIBZMQ_DELETE(s);
return NULL;
}
@ -189,10 +190,11 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
zmq::socket_base_t::~socket_base_t ()
{
delete mailbox;
LIBZMQ_DELETE(mailbox);
if (reaper_signaler)
delete reaper_signaler;
if (reaper_signaler) {
LIBZMQ_DELETE(reaper_signaler);
}
stop_monitor ();
zmq_assert (destroyed);
@ -235,7 +237,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protcol is something we are aware of.
// First check out whether the protocol is something we are aware of.
if (protocol_ != "inproc"
&& protocol_ != "ipc"
&& protocol_ != "tcp"
@ -247,7 +249,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return -1;
}
// If 0MQ is not compiled with OpenPGM, pgm and epgm transports
// are not avaialble.
// are not available.
#if !defined ZMQ_HAVE_OPENPGM
if (protocol_ == "pgm" || protocol_ == "epgm") {
errno = EPROTONOSUPPORT;
@ -298,7 +300,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
// First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this);
pipes.push_back (pipe_);
// Let the derived socket type know about new pipe.
xattach_pipe (pipe_, subscribe_to_all_);
@ -315,12 +317,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
{
ENTER_MUTEX();
if (!options.is_valid(option_)) {
errno = EINVAL;
EXIT_MUTEX();
return -1;
}
if (!options.is_valid(option_)) {
errno = EINVAL;
EXIT_MUTEX();
return -1;
}
if (unlikely (ctx_terminated)) {
errno = ETERM;
@ -338,7 +339,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
// If the socket type doesn't support the option, pass it to
// the generic option parser.
rc = options.setsockopt (option_, optval_, optvallen_);
update_pipe_options(option_);
update_pipe_options(option_);
EXIT_MUTEX();
return rc;
@ -361,6 +362,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
EXIT_MUTEX();
return -1;
}
memset(optval_, 0, *optvallen_);
*((int*) optval_) = rcvmore ? 1 : 0;
*optvallen_ = sizeof (int);
EXIT_MUTEX();
@ -380,10 +382,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
EXIT_MUTEX();
return -1;
}
*((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
*optvallen_ = sizeof(fd_t);
*optvallen_ = sizeof(fd_t);
EXIT_MUTEX();
return 0;
}
@ -422,11 +424,56 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return 0;
}
if (option_ == ZMQ_THREAD_SAFE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
EXIT_MUTEX();
return -1;
}
memset(optval_, 0, *optvallen_);
*((int*) optval_) = thread_safe ? 1 : 0;
*optvallen_ = sizeof (int);
EXIT_MUTEX();
return 0;
}
int rc = options.getsockopt (option_, optval_, optvallen_);
EXIT_MUTEX();
return rc;
}
int zmq::socket_base_t::add_signaler(signaler_t *s_)
{
ENTER_MUTEX();
if (!thread_safe) {
errno = EINVAL;
EXIT_MUTEX();
return -1;
}
((mailbox_safe_t*)mailbox)->add_signaler(s_);
EXIT_MUTEX();
return 0;
}
int zmq::socket_base_t::remove_signaler(signaler_t *s_)
{
ENTER_MUTEX();
if (!thread_safe) {
errno = EINVAL;
EXIT_MUTEX();
return -1;
}
((mailbox_safe_t*)mailbox)->remove_signaler(s_);
EXIT_MUTEX();
return 0;
}
int zmq::socket_base_t::bind (const char *addr_)
{
ENTER_MUTEX();
@ -465,7 +512,7 @@ int zmq::socket_base_t::bind (const char *addr_)
}
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
// For convenience's sake, bind can be used interchageable with
// For convenience's sake, bind can be used interchangeable with
// connect for PGM, EPGM and NORM transports.
EXIT_MUTEX();
rc = connect (addr_);
@ -474,7 +521,7 @@ int zmq::socket_base_t::bind (const char *addr_)
return rc;
}
// Remaining trasnports require to be run in an I/O thread, so at this
// Remaining transports require to be run in an I/O thread, so at this
// point we'll choose one.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
@ -489,7 +536,7 @@ int zmq::socket_base_t::bind (const char *addr_)
alloc_assert (listener);
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno());
EXIT_MUTEX();
return -1;
@ -511,7 +558,7 @@ int zmq::socket_base_t::bind (const char *addr_)
alloc_assert (listener);
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno());
EXIT_MUTEX();
return -1;
@ -533,7 +580,7 @@ int zmq::socket_base_t::bind (const char *addr_)
alloc_assert (listener);
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno());
EXIT_MUTEX();
return -1;
@ -742,7 +789,7 @@ int zmq::socket_base_t::connect (const char *addr_)
}
if (rc == -1) {
errno = EINVAL;
delete paddr;
LIBZMQ_DELETE(paddr);
EXIT_MUTEX();
return -1;
}
@ -756,15 +803,15 @@ int zmq::socket_base_t::connect (const char *addr_)
alloc_assert (paddr->resolved.ipc_addr);
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
if (rc != 0) {
delete paddr;
LIBZMQ_DELETE(paddr);
EXIT_MUTEX();
return -1;
}
}
#endif
// TBD - Should we check address for ZMQ_HAVE_NORM???
#ifdef ZMQ_HAVE_OPENPGM
if (protocol == "pgm" || protocol == "epgm") {
struct pgm_addrinfo_t *res = NULL;
@ -785,7 +832,7 @@ int zmq::socket_base_t::connect (const char *addr_)
alloc_assert (paddr->resolved.tipc_addr);
int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
if (rc != 0) {
delete paddr;
LIBZMQ_DELETE(paddr);
EXIT_MUTEX();
return -1;
}
@ -980,7 +1027,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (process_commands (timeout, false) != 0)) {
EXIT_MUTEX();
return -1;
}
}
rc = xsend (msg_);
if (rc == 0)
break;
@ -1120,7 +1167,7 @@ int zmq::socket_base_t::close ()
{
// Mark the socket as dead
tag = 0xdeadbeef;
// Transfer the ownership of the socket from this application thread
// to the reaper thread which will take care of the rest of shutdown
// process.
@ -1148,13 +1195,13 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
if (!thread_safe)
fd = ((mailbox_t*)mailbox)->get_fd();
else {
else {
ENTER_MUTEX();
reaper_signaler = new signaler_t();
// Add signaler to the safe mailbox
fd = reaper_signaler->get_fd();
fd = reaper_signaler->get_fd();
((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
// Send a signal to make sure reaper handle existing commands
@ -1261,13 +1308,13 @@ void zmq::socket_base_t::process_term (int linger_)
void zmq::socket_base_t::update_pipe_options(int option_)
{
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
{
for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
{
pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
}
}
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
{
for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
{
pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
}
}
}
@ -1331,11 +1378,11 @@ void zmq::socket_base_t::in_event ()
// be destroyed.
ENTER_MUTEX();
// If the socket is thread safe we need to unsignal the reaper signaler
if (thread_safe)
reaper_signaler->recv();
process_commands (0, false);
EXIT_MUTEX();
check_destroy ();
@ -1450,12 +1497,12 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
int linger = 0;
int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
if (rc == -1)
stop_monitor ();
stop_monitor (false);
// Spawn the monitor socket endpoint
rc = zmq_bind (monitor_socket, addr_);
if (rc == -1)
stop_monitor ();
stop_monitor (false);
return rc;
}
@ -1536,23 +1583,12 @@ void zmq::socket_base_t::monitor_event (int event_, int value_, const std::strin
// Send event in first frame
zmq_msg_t msg;
zmq_msg_init_size (&msg, 6);
#ifdef ZMQ_HAVE_HPUX
// avoid SIGBUS
union {
uint8_t data[6];
struct {
uint16_t event;
uint32_t value;
} v;
} u;
u.v.event = event_;
u.v.value = value_;
memcpy(zmq_msg_data (&msg), u.data, 6);
#else
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
*(uint16_t *) (data + 0) = (uint16_t) event_;
*(uint32_t *) (data + 2) = (uint32_t) value_;
#endif
// Avoid dereferencing uint32_t on unaligned address
uint16_t event = (uint16_t) event_;
uint32_t value = (uint32_t) value_;
memcpy (data + 0, &event, sizeof(event));
memcpy (data + 2, &value, sizeof(value));
zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
// Send address in second frame
@ -1562,10 +1598,10 @@ void zmq::socket_base_t::monitor_event (int event_, int value_, const std::strin
}
}
void zmq::socket_base_t::stop_monitor (void)
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
{
if (monitor_socket) {
if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
zmq_close (monitor_socket);
monitor_socket = NULL;

View File

@ -90,6 +90,8 @@ namespace zmq
int term_endpoint (const char *addr_);
int send (zmq::msg_t *msg_, int flags_);
int recv (zmq::msg_t *msg_, int flags_);
int add_signaler (signaler_t *s);
int remove_signaler (signaler_t *s);
int close ();
// These functions are used by the polling mechanism to determine
@ -97,7 +99,7 @@ namespace zmq
bool has_in ();
bool has_out ();
// Using this function reaper thread ask the socket to regiter with
// Using this function reaper thread ask the socket to register with
// its poller.
void start_reaping (poller_t *poller_);
@ -169,11 +171,11 @@ namespace zmq
// Delay actual destruction of the socket.
void process_destroy ();
// Socket event data dispath
// Socket event data dispatch
void monitor_event (int event_, int value_, const std::string& addr_);
// Monitor socket cleanup
void stop_monitor ();
void stop_monitor (bool send_monitor_stopped_event_ = true);
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std::string connect_rid;

234
src/socket_poller.cpp Normal file
View File

@ -0,0 +1,234 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "socket_poller.hpp"
#include "err.hpp"
zmq::socket_poller_t::socket_poller_t () :
tag (0xCAFEBABE),
poll_set (NULL),
poll_events (NULL)
{
pollfd = zmq_pollfd_new ();
}
zmq::socket_poller_t::~socket_poller_t ()
{
// Mark the socket_poller as dead
tag = 0xdeadbeef;
for (events_t::iterator it = events.begin(); it != events.end(); ++it) {
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt(it->socket, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
zmq_remove_pollfd(it->socket, pollfd);
}
}
zmq_pollfd_close (pollfd);
if (poll_set) {
free (poll_set);
poll_set = NULL;
}
if (poll_events) {
free (poll_events);
poll_events = NULL;
}
}
bool zmq::socket_poller_t::check_tag ()
{
return tag == 0xCAFEBABE;
}
int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_)
{
for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
if (it->socket == socket_) {
errno = EINVAL;
return -1;
}
}
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (thread_safe) {
if (zmq_add_pollfd (socket_, pollfd) == -1)
return -1;
}
event_t event = {socket_, 0, user_data_};
events.push_back (event);
need_rebuild = true;
return 0;
}
#if defined _WIN32
int zmq::socket_poller_t::add_fd (SOCKET fd_, void *user_data_)
#else
int zmq::socket_poller_t::add_fd (int fd_, void *user_data_)
#endif
{
for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
if (!it->socket && it->fd == fd_) {
errno = EINVAL;
return -1;
}
}
event_t event = {NULL, fd_, user_data_};
events.push_back (event);
need_rebuild = true;
return 0;
}
int zmq::socket_poller_t::remove_socket (void* socket_)
{
events_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
if (it->socket == socket_)
break;
}
if (it == events.end()) {
errno = EINVAL;
return -1;
}
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (thread_safe) {
if (zmq_remove_pollfd (socket_, pollfd) == -1)
return -1;
}
events.erase (it);
need_rebuild = true;
return 0;
}
#if defined _WIN32
int zmq::socket_poller_t::remove_fd (SOCKET fd_)
#else
int zmq::socket_poller_t::remove_fd (int fd_)
#endif
{
events_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
if (!it->socket && it->fd == fd_)
break;
}
if (it == events.end()) {
errno = EINVAL;
return -1;
}
events.erase (it);
need_rebuild = true;
return 0;
}
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_)
{
if (need_rebuild)
rebuild ();
int rc = zmq_pollfd_poll (pollfd, poll_set, poll_size, timeout_);
if (rc == -1) {
return rc;
}
if (rc == 0) {
errno = EAGAIN;
return -1;
}
for (int i = 0; i < poll_size; i++) {
if (poll_set [i].revents & ZMQ_POLLIN) {
*event_ = poll_events[i];
break;
}
}
return 0;
}
void zmq::socket_poller_t::rebuild ()
{
if (poll_set) {
free (poll_set);
poll_set = NULL;
}
if (poll_events) {
free (poll_events);
poll_events = NULL;
}
poll_size = events.size ();
poll_set = (zmq_pollitem_t*) malloc (poll_size * sizeof (zmq_pollitem_t));
alloc_assert (poll_set);
poll_events = (event_t*) malloc (poll_size * sizeof (event_t));
int event_nbr = 0;
for (events_t::iterator it = events.begin (); it != events.end (); ++it, event_nbr++) {
poll_set [event_nbr].socket = it->socket;
if (!it->socket)
poll_set [event_nbr].fd = it->fd;
poll_set [event_nbr].events = ZMQ_POLLIN;
poll_events [event_nbr] = *it;
}
need_rebuild = false;
}

104
src/socket_poller.hpp Normal file
View File

@ -0,0 +1,104 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SOCKET_POLLER_HPP_INCLUDED__
#define __ZMQ_SOCKET_POLLER_HPP_INCLUDED__
#include <vector>
#include <algorithm>
#include "../include/zmq.h"
namespace zmq
{
class socket_poller_t
{
public:
socket_poller_t ();
~socket_poller_t ();
typedef struct event_t
{
void *socket;
#if defined _WIN32
SOCKET fd;
#else
int fd;
#endif
void *user_data;
} event_t;
int add_socket (void *socket, void *user_data);
int remove_socket (void *socket);
#if defined _WIN32
int add_fd (SOCKET fd, void *user_data);
int remove_fd (SOCKET fd);
#else
int add_fd (int fd, void *user_data);
int remove_fd (int fd);
#endif
int wait (event_t *event, long timeout);
// Return false if object is not a socket.
bool check_tag ();
private:
void rebuild ();
// Used to check whether the object is a socket_poller.
uint32_t tag;
// Pollfd used for thread safe sockets polling
void *pollfd;
// List of sockets
typedef std::vector <event_t> events_t;
events_t events;
// Current zmq_poll set
zmq_pollitem_t *poll_set;
// Matching set to events
event_t *poll_events;
// Size of the pollset
int poll_size;
// Does the pollset needs rebuilding?
bool need_rebuild;
socket_poller_t (const socket_poller_t&);
const socket_poller_t &operator = (const socket_poller_t&);
};
}
#endif

View File

@ -30,6 +30,7 @@
#include <new>
#include <string>
#include "macros.hpp"
#include "socks_connecter.hpp"
#include "stream_engine.hpp"
#include "platform.hpp"
@ -72,7 +73,7 @@ zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
zmq::socks_connecter_t::~socks_connecter_t ()
{
zmq_assert (s == retired_fd);
delete proxy_addr;
LIBZMQ_DELETE(proxy_addr);
}
void zmq::socks_connecter_t::process_plug ()
@ -303,15 +304,14 @@ int zmq::socks_connecter_t::connect_to_proxy ()
zmq_assert (s == retired_fd);
// Resolve the address
delete proxy_addr->resolved.tcp_addr;
LIBZMQ_DELETE(proxy_addr->resolved.tcp_addr);
proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
alloc_assert (proxy_addr->resolved.tcp_addr);
int rc = proxy_addr->resolved.tcp_addr->resolve (
proxy_addr->address.c_str (), false, options.ipv6);
if (rc != 0) {
delete proxy_addr->resolved.tcp_addr;
proxy_addr->resolved.tcp_addr = NULL;
LIBZMQ_DELETE(proxy_addr->resolved.tcp_addr);
return -1;
}
zmq_assert (proxy_addr->resolved.tcp_addr != NULL);
@ -361,18 +361,18 @@ int zmq::socks_connecter_t::connect_to_proxy ()
// Connect to the remote peer.
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
// Connect was successfull immediately.
// Connect was successful immediately.
if (rc == 0)
return 0;
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS
const int error_code = WSAGetLastError ();
if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK)
const int last_error = WSAGetLastError();
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
errno = EINPROGRESS;
else {
errno = wsa_error_to_errno (error_code);
errno = wsa_error_to_errno (last_error);
close ();
}
#else

View File

@ -103,7 +103,7 @@ namespace zmq
int get_new_reconnect_ivl ();
// Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successfull immediately. Returns -1 with
// 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched.
int open ();
@ -111,7 +111,7 @@ namespace zmq
void close ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessfull.
// retired_fd if the connection was unsuccessful.
zmq::fd_t check_proxy_connection ();
socks_greeting_encoder_t greeting_encoder;

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "stream.hpp"
#include "pipe.hpp"
#include "wire.hpp"
@ -58,8 +59,7 @@ zmq::stream_t::~stream_t ()
void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void)subscribe_to_all_;
LIBZMQ_UNUSED(subscribe_to_all_);
zmq_assert (pipe_);
@ -226,7 +226,7 @@ int zmq::stream_t::xrecv (msg_t *msg_)
zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
// We have received a frame with TCP data.
// Rather than sendig this frame, we keep it in prefetched
// Rather than sending this frame, we keep it in prefetched
// buffer and send a frame with peer's ID.
blob_t identity = pipe->get_identity ();
rc = msg_->close();

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
@ -172,13 +173,15 @@ zmq::stream_engine_t::~stream_engine_t ()
// Drop reference to metadata and destroy it if we are
// the only user.
if (metadata != NULL)
if (metadata->drop_ref ())
delete metadata;
if (metadata != NULL) {
if (metadata->drop_ref ()) {
LIBZMQ_DELETE(metadata);
}
}
delete encoder;
delete decoder;
delete mechanism;
LIBZMQ_DELETE(encoder);
LIBZMQ_DELETE(decoder);
LIBZMQ_DELETE(mechanism);
}
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,

View File

@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "macros.hpp"
#include "ip.hpp"
#include "tcp.hpp"
#include "err.hpp"
@ -94,13 +95,13 @@ void zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
{
// These options are used only under certain #ifdefs below.
(void)keepalive_;
(void)keepalive_cnt_;
(void)keepalive_idle_;
(void)keepalive_intvl_;
LIBZMQ_UNUSED (keepalive_);
LIBZMQ_UNUSED (keepalive_cnt_);
LIBZMQ_UNUSED (keepalive_idle_);
LIBZMQ_UNUSED (keepalive_intvl_);
// If none of the #ifdefs apply, then s_ is unused.
(void)s_;
LIBZMQ_UNUSED (s_);
// Tuning TCP keep-alives if platform allows it
// All values = -1 means skip and leave it for OS
@ -152,7 +153,25 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int
#endif // ZMQ_HAVE_WINDOWS
}
int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
void zmq::tune_tcp_retransmit_timeout (fd_t sockfd_, int timeout_)
{
if (timeout_ <= 0)
return;
#if defined (ZMQ_HAVE_WINDOWS) && defined (TCP_MAXRT)
// msdn says it's supported in >= Vista, >= Windows Server 2003
timeout_ /= 1000; // in seconds
int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, (char*) &timeout_,
sizeof(timeout_));
wsa_assert (rc != SOCKET_ERROR);
#elif defined (TCP_USER_TIMEOUT) // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
sizeof(timeout_));
errno_assert (rc == 0);
#endif
}
int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
{
#ifdef ZMQ_HAVE_WINDOWS
@ -160,22 +179,24 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
// If not a single byte can be written to the socket in non-blocking mode
// we'll get an error (this may happen during the speculative write).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
const int last_error = WSAGetLastError();
if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK)
return 0;
// Signalise peer failure.
if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAEHOSTUNREACH ||
WSAGetLastError () == WSAECONNABORTED ||
WSAGetLastError () == WSAETIMEDOUT ||
WSAGetLastError () == WSAECONNRESET))
last_error == WSAENETDOWN ||
last_error == WSAENETRESET ||
last_error == WSAEHOSTUNREACH ||
last_error == WSAECONNABORTED ||
last_error == WSAETIMEDOUT ||
last_error == WSAECONNRESET
))
return -1;
// Circumvent a Windows bug; see https://support.microsoft.com/en-us/kb/201213
// and https://zeromq.jira.com/browse/LIBZMQ-195
if (nbytes == SOCKET_ERROR && WSAGetLastError() == WSAENOBUFS)
if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS)
return 0;
wsa_assert (nbytes != SOCKET_ERROR);
@ -219,22 +240,24 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
// If not a single byte can be read from the socket in non-blocking mode
// we'll get an error (this may happen during the speculative read).
if (rc == SOCKET_ERROR) {
if (WSAGetLastError () == WSAEWOULDBLOCK)
errno = EAGAIN;
else {
wsa_assert (WSAGetLastError () == WSAENETDOWN
|| WSAGetLastError () == WSAENETRESET
|| WSAGetLastError () == WSAECONNABORTED
|| WSAGetLastError () == WSAETIMEDOUT
|| WSAGetLastError () == WSAECONNRESET
|| WSAGetLastError () == WSAECONNREFUSED
|| WSAGetLastError () == WSAENOTCONN);
errno = wsa_error_to_errno (WSAGetLastError ());
}
if (rc == SOCKET_ERROR) {
const int last_error = WSAGetLastError();
if (last_error == WSAEWOULDBLOCK) {
errno = EAGAIN;
}
else {
wsa_assert (last_error == WSAENETDOWN ||
last_error == WSAENETRESET ||
last_error == WSAECONNABORTED ||
last_error == WSAETIMEDOUT ||
last_error == WSAECONNRESET ||
last_error == WSAECONNREFUSED ||
last_error == WSAENOTCONN);
errno = wsa_error_to_errno (last_error);
}
}
return rc == SOCKET_ERROR? -1: rc;
return rc == SOCKET_ERROR ? -1 : rc;
#else

View File

@ -47,6 +47,9 @@ namespace zmq
// Tunes TCP keep-alives
void tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_);
// Tunes TCP retransmit timeout
void tune_tcp_retransmit_timeout (fd_t sockfd_, int timeout_);
// Writes data to the socket. Returns the number of bytes actually
// written (even zero is to be considered to be a success). In case
// of error or orderly shutdown by the other peer -1 is returned.

View File

@ -30,6 +30,7 @@
#include <string>
#include <sstream>
#include "macros.hpp"
#include "tcp_address.hpp"
#include "platform.hpp"
#include "stdint.hpp"
@ -56,7 +57,7 @@
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_)
{
// TODO: Unused parameter, IPv6 support not implemented for Solaris.
(void) ipv6_;
LIBZMQ_UNUSED (ipv6_);
// Create a socket.
const int fd = open_socket (AF_INET, SOCK_DGRAM, 0);
@ -123,7 +124,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_)
{
// TODO: Unused parameter, IPv6 support not implemented for AIX or HP/UX.
(void) ipv6_;
LIBZMQ_UNUSED (ipv6_);
// Create a socket.
const int sd = open_socket (AF_INET, SOCK_DGRAM, 0);
@ -156,7 +157,8 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_
#elif ((defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENBSD ||\
defined ZMQ_HAVE_QNXNTO || defined ZMQ_HAVE_NETBSD)\
defined ZMQ_HAVE_QNXNTO || defined ZMQ_HAVE_NETBSD ||\
defined ZMQ_HAVE_DRAGONFLY)\
&& defined ZMQ_HAVE_IFADDRS)
#include <ifaddrs.h>
@ -209,9 +211,8 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_
// This is true especially of Windows.
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_)
{
// All unused parameters.
(void) nic_;
(void) ipv6_;
LIBZMQ_UNUSED (nic_);
LIBZMQ_UNUSED (ipv6_);
errno = ENODEV;
return -1;
@ -280,7 +281,7 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_, bool ipv6_, b
// service-name irregularity due to indeterminate socktype.
req.ai_flags = AI_PASSIVE | AI_NUMERICHOST;
#if defined AI_V4MAPPED && !defined ZMQ_HAVE_FREEBSD
#if defined AI_V4MAPPED && !defined ZMQ_HAVE_FREEBSD && !defined ZMQ_HAVE_DRAGONFLY
// In this API we only require IPv4-mapped addresses when
// no native IPv6 interfaces are available (~AI_ALL).
// This saves an additional DNS roundtrip for IPv4 addresses.
@ -330,7 +331,7 @@ int zmq::tcp_address_t::resolve_hostname (const char *hostname_, bool ipv6_, boo
// doesn't really matter, since it's not included in the addr-output.
req.ai_socktype = SOCK_STREAM;
#if defined AI_V4MAPPED && !defined ZMQ_HAVE_FREEBSD
#if defined AI_V4MAPPED && !defined ZMQ_HAVE_FREEBSD && !defined ZMQ_HAVE_DRAGONFLY
// In this API we only require IPv4-mapped addresses when
// no native IPv6 interfaces are available.
// This saves an additional DNS roundtrip for IPv4 addresses.
@ -476,7 +477,7 @@ int zmq::tcp_address_t::to_string (std::string &addr_)
return -1;
}
// Not using service resolv because of
// Not using service resolving because of
// https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4
char hbuf [NI_MAXHOST];
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof hbuf, NULL, 0, NI_NUMERICHOST);

View File

@ -51,7 +51,7 @@ namespace zmq
virtual ~tcp_address_t ();
// This function translates textual TCP address into an address
// strcuture. If 'local' is true, names are resolved as local interface
// structure. If 'local' is true, names are resolved as local interface
// names. If it is false, names are resolved as remote hostnames.
// If 'ipv6' is true, the name may resolve to IPv6 address.
int resolve (const char *name_, bool local_, bool ipv6_, bool is_src_ = false);

View File

@ -30,6 +30,7 @@
#include <new>
#include <string>
#include "macros.hpp"
#include "tcp_connecter.hpp"
#include "stream_engine.hpp"
#include "io_thread.hpp"
@ -67,7 +68,8 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
s (retired_fd),
handle_valid (false),
delayed_start (delayed_start_),
timer_started (false),
connect_timer_started (false),
reconnect_timer_started (false),
session (session_),
current_reconnect_ivl (options.reconnect_ivl)
{
@ -79,7 +81,8 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
zmq::tcp_connecter_t::~tcp_connecter_t ()
{
zmq_assert (!timer_started);
zmq_assert (!connect_timer_started);
zmq_assert (!reconnect_timer_started);
zmq_assert (!handle_valid);
zmq_assert (s == retired_fd);
}
@ -94,9 +97,14 @@ void zmq::tcp_connecter_t::process_plug ()
void zmq::tcp_connecter_t::process_term (int linger_)
{
if (timer_started) {
if (connect_timer_started) {
cancel_timer (connect_timer_id);
connect_timer_started = false;
}
if (reconnect_timer_started) {
cancel_timer (reconnect_timer_id);
timer_started = false;
reconnect_timer_started = false;
}
if (handle_valid) {
@ -120,6 +128,11 @@ void zmq::tcp_connecter_t::in_event ()
void zmq::tcp_connecter_t::out_event ()
{
if (connect_timer_started) {
cancel_timer (connect_timer_id);
connect_timer_started = false;
}
rm_fd (handle);
handle_valid = false;
@ -133,6 +146,7 @@ void zmq::tcp_connecter_t::out_event ()
tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_retransmit_timeout (fd, options.tcp_retransmit_timeout);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd (fd);
@ -153,9 +167,20 @@ void zmq::tcp_connecter_t::out_event ()
void zmq::tcp_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id);
timer_started = false;
start_connecting ();
zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
if (id_ == connect_timer_id) {
connect_timer_started = false;
rm_fd (handle);
handle_valid = false;
close ();
add_reconnect_timer ();
}
else if (id_ == reconnect_timer_id) {
reconnect_timer_started = false;
start_connecting ();
}
}
void zmq::tcp_connecter_t::start_connecting ()
@ -177,6 +202,9 @@ void zmq::tcp_connecter_t::start_connecting ()
handle_valid = true;
set_pollout (handle);
socket->event_connect_delayed (endpoint, zmq_errno());
// add userspace connect timeout
add_connect_timer ();
}
// Handle any other error condition by eventual reconnect.
@ -187,12 +215,20 @@ void zmq::tcp_connecter_t::start_connecting ()
}
}
void zmq::tcp_connecter_t::add_connect_timer ()
{
if (options.connect_timeout > 0) {
add_timer (options.connect_timeout, connect_timer_id);
connect_timer_started = true;
}
}
void zmq::tcp_connecter_t::add_reconnect_timer ()
{
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
socket->event_connect_retried (endpoint, interval);
timer_started = true;
reconnect_timer_started = true;
}
int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
@ -217,8 +253,7 @@ int zmq::tcp_connecter_t::open ()
// Resolve the address
if (addr->resolved.tcp_addr != NULL) {
delete addr->resolved.tcp_addr;
addr->resolved.tcp_addr = NULL;
LIBZMQ_DELETE(addr->resolved.tcp_addr);
}
addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
@ -226,8 +261,7 @@ int zmq::tcp_connecter_t::open ()
int rc = addr->resolved.tcp_addr->resolve (
addr->address.c_str (), false, options.ipv6);
if (rc != 0) {
delete addr->resolved.tcp_addr;
addr->resolved.tcp_addr = NULL;
LIBZMQ_DELETE(addr->resolved.tcp_addr);
return -1;
}
zmq_assert (addr->resolved.tcp_addr != NULL);
@ -277,18 +311,18 @@ int zmq::tcp_connecter_t::open ()
// Connect to the remote peer.
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
// Connect was successfull immediately.
// Connect was successful immediately.
if (rc == 0)
return 0;
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS
const int error_code = WSAGetLastError ();
if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK)
const int last_error = WSAGetLastError();
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
errno = EINPROGRESS;
else
errno = wsa_error_to_errno (error_code);
errno = wsa_error_to_errno (last_error);
#else
if (errno == EINTR)
errno = EINPROGRESS;
@ -321,7 +355,8 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
&& err != WSAENETDOWN
&& err != WSAEACCES
&& err != WSAEINVAL
&& err != WSAEADDRINUSE)
&& err != WSAEADDRINUSE
&& err != WSAEADDRNOTAVAIL)
{
wsa_assert_no (err);
}
@ -341,7 +376,8 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
errno == EHOSTUNREACH ||
errno == ENETUNREACH ||
errno == ENETDOWN ||
errno == EINVAL);
errno == EINVAL ||
errno == EADDRNOTAVAIL);
return retired_fd;
}
#endif

View File

@ -57,7 +57,7 @@ namespace zmq
private:
// ID of the timer used to delay the reconnection.
enum {reconnect_timer_id = 1};
enum {reconnect_timer_id = 1, connect_timer_id};
// Handlers for incoming commands.
void process_plug ();
@ -71,6 +71,9 @@ namespace zmq
// Internal function to start the actual connection establishment.
void start_connecting ();
// Internal function to add a connect timer
void add_connect_timer();
// Internal function to add a reconnect timer
void add_reconnect_timer();
@ -80,7 +83,7 @@ namespace zmq
int get_new_reconnect_ivl ();
// Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successfull immediately. Returns -1 with
// 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched.
int open ();
@ -88,7 +91,7 @@ namespace zmq
void close ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessfull.
// retired_fd if the connection was unsuccessful.
fd_t connect ();
// Address to connect to. Owned by session_base_t.
@ -108,7 +111,8 @@ namespace zmq
const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
bool connect_timer_started;
bool reconnect_timer_started;
// Reference to the session we belong to.
zmq::session_base_t *session;

View File

@ -100,6 +100,7 @@ void zmq::tcp_listener_t::in_event ()
tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_retransmit_timeout (fd, options.tcp_retransmit_timeout);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd(fd);
@ -277,10 +278,11 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
#ifdef ZMQ_HAVE_WINDOWS
if (sock == INVALID_SOCKET) {
wsa_assert (WSAGetLastError () == WSAEWOULDBLOCK ||
WSAGetLastError () == WSAECONNRESET ||
WSAGetLastError () == WSAEMFILE ||
WSAGetLastError () == WSAENOBUFS);
const int last_error = WSAGetLastError();
wsa_assert (last_error == WSAEWOULDBLOCK ||
last_error == WSAECONNRESET ||
last_error == WSAEMFILE ||
last_error == WSAENOBUFS);
return retired_fd;
}
#if !defined _WIN32_WCE

View File

@ -84,7 +84,7 @@ namespace zmq
// Handle corresponding to the listening socket.
handle_t handle;
// Socket the listerner belongs to.
// Socket the listener belongs to.
zmq::socket_base_t *socket;
// String representation of endpoint to bind to

View File

@ -36,7 +36,7 @@
extern "C"
{
#if defined _WIN32_WCE
static DWORD thread_routine (LPVOID arg_)
static DWORD thread_routine (LPVOID arg_)
#else
static unsigned int __stdcall thread_routine (void *arg_)
#endif
@ -58,7 +58,7 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
descriptor = (HANDLE) _beginthreadex (NULL, 0,
&::thread_routine, this, 0 , NULL);
#endif
win_assert (descriptor != NULL);
win_assert (descriptor != NULL);
}
void zmq::thread_t::stop ()
@ -92,7 +92,7 @@ extern "C"
posix_assert (rc);
#endif
zmq::thread_t *self = (zmq::thread_t*) arg_;
zmq::thread_t *self = (zmq::thread_t*) arg_;
self->tfn (self->arg);
return NULL;
}

View File

@ -102,7 +102,7 @@ void zmq::tipc_connecter_t::process_term (int linger_)
void zmq::tipc_connecter_t::in_event ()
{
// We are not polling for incomming data, so we are actually called
// We are not polling for incoming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event ();
@ -213,7 +213,7 @@ int zmq::tipc_connecter_t::open ()
s, addr->resolved.tipc_addr->addr (),
addr->resolved.tipc_addr->addrlen ());
// Connect was successfull immediately.
// Connect was successful immediately.
if (rc == 0)
return 0;

Some files were not shown because too many files have changed in this diff Show More