Merge branch 'master' of github.com:zeromq/libzmq

# Conflicts:
#	CMakeLists.txt
This commit is contained in:
Mário Kašuba 2015-11-01 20:59:03 +01:00
commit 9a8c822aa2
25 changed files with 961 additions and 923 deletions

View File

@ -3,20 +3,29 @@
cmake_minimum_required(VERSION 2.8)
project(ZeroMQ)
list(INSERT CMAKE_MODULE_PATH 0 "${CMAKE_SOURCE_DIR}")
option(WITH_OPENPGM "Build with support for OpenPGM" OFF)
if(APPLE)
option(ZMQ_BUILD_FRAMEWORK "Build as OS X framework" ON)
endif()
if(WIN32)
option(WITH_TWEETNACL "Build with tweetnacl" OFF)
else()
option(WITH_TWEETNACL "Build with tweetnacl" ON)
option(WITH_SODIUM "Build with libsodium" ON)
option(WITH_TWEETNACL "Build with tweetnacl" ON)
if(WITH_SODIUM)
find_package(Sodium)
if(SODIUM_FOUND)
add_definitions(-DHAVE_LIBSODIUM)
include_directories(${SODIUM_INCLUDE_DIRS})
endif()
endif()
if(WITH_TWEETNACL)
add_definitions(-DHAVE_TWEETNACL -DHAVE_LIBSODIUM)
message(STATUS "Building with TweetNaCL")
set(USE_TWEETNACL ON)
add_definitions(-DHAVE_TWEETNACL)
include_directories(
tweetnacl/contrib/randombytes
tweetnacl/src
@ -25,16 +34,14 @@ if(WITH_TWEETNACL)
set(TWEETNACL_SOURCES
tweetnacl/src/tweetnacl.c
)
if(WIN32)
list(APPEND TWEETNACL_SOURCES tweetnacl/contrib/randombytes/winrandom.c)
else()
list(APPEND TWEETNACL_SOURCES tweetnacl/contrib/randombytes/devurandom.c)
endif()
else()
find_library(SODIUM_FOUND sodium HINTS "${CMAKE_PREFIX_PATH}/libsodium/bin")
endif()
set(POLLER "" CACHE STRING "Choose polling system. valid values are
kqueue, epoll, devpoll, poll or select [default=autodetect]")
@ -502,9 +509,11 @@ foreach(source ${cxx-sources})
list(APPEND sources ${CMAKE_CURRENT_SOURCE_DIR}/src/${source})
endforeach()
foreach(source ${TWEETNACL_SOURCES})
list(APPEND sources ${CMAKE_CURRENT_SOURCE_DIR}/${source})
endforeach()
if(USE_TWEETNACL)
foreach(source ${TWEETNACL_SOURCES})
list(APPEND sources ${CMAKE_CURRENT_SOURCE_DIR}/${source})
endforeach()
endif()
foreach(source ${rc-sources})
list(APPEND sources ${CMAKE_CURRENT_BINARY_DIR}/${source})
@ -602,7 +611,8 @@ else()
PUBLIC_HEADER "${public_headers}"
VERSION ${ZMQ_VERSION}
SOVERSION "${ZMQ_VERSION_MAJOR}.${ZMQ_VERSION_MINOR}.0"
OUTPUT_NAME "libzmq")
OUTPUT_NAME "libzmq"
PREFIX "")
if(ZMQ_BUILD_FRAMEWORK)
set_target_properties(libzmq PROPERTIES
FRAMEWORK TRUE
@ -620,10 +630,15 @@ else()
set_target_properties(libzmq-static PROPERTIES
PUBLIC_HEADER "${public_headers}"
COMPILE_DEFINITIONS "ZMQ_STATIC"
OUTPUT_NAME "libzmq-static")
OUTPUT_NAME "libzmq-static"
PREFIX "")
endif()
target_link_libraries(libzmq ${SODIUM_LIBRARY} ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(libzmq ${CMAKE_THREAD_LIBS_INIT})
if(SODIUM_FOUND)
target_link_libraries(libzmq ${SODIUM_LIBRARIES})
endif()
if(HAVE_WS2_32)
target_link_libraries(libzmq ws2_32)
elseif(HAVE_WS2)

40
FindSodium.cmake Normal file
View File

@ -0,0 +1,40 @@
################################################################################
# THIS FILE IS 100% GENERATED BY ZPROJECT; DO NOT EDIT EXCEPT EXPERIMENTALLY #
# Please refer to the README for information about making permanent changes. #
################################################################################
if (NOT MSVC)
include(FindPkgConfig)
pkg_check_modules(PC_SODIUM "libsodium")
if (NOT PC_SODIUM_FOUND)
pkg_check_modules(PC_SODIUM "sodium")
endif (NOT PC_SODIUM_FOUND)
if (PC_SODIUM_FOUND)
set(SODIUM_INCLUDE_HINTS ${PC_SODIUM_INCLUDE_DIRS} ${PC_SODIUM_INCLUDE_DIRS}/*)
set(SODIUM_LIBRARY_HINTS ${PC_SODIUM_LIBRARY_DIRS} ${PC_SODIUM_LIBRARY_DIRS}/*)
endif()
endif (NOT MSVC)
# some libraries install the headers is a subdirectory of the include dir
# returned by pkg-config, so use a wildcard match to improve chances of finding
# headers and libraries.
find_path(
SODIUM_INCLUDE_DIRS
NAMES sodium.h
HINTS ${SODIUM_INCLUDE_HINTS}
)
find_library(
SODIUM_LIBRARIES
NAMES libsodium sodium
HINTS ${SODIUM_LIBRARY_HINTS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(SODIUM DEFAULT_MSG SODIUM_LIBRARIES SODIUM_INCLUDE_DIRS)
mark_as_advanced(SODIUM_FOUND SODIUM_LIBRARIES SODIUM_INCLUDE_DIRS)
################################################################################
# THIS FILE IS 100% GENERATED BY ZPROJECT; DO NOT EDIT EXCEPT EXPERIMENTALLY #
# Please refer to the README for information about making permanent changes. #
################################################################################

View File

@ -367,7 +367,6 @@ test_apps = \
tests/test_socketopt_hwm \
tests/test_heartbeats \
tests/test_stream_exceeds_buffer \
tests/test_thread_safe_polling \
tests/test_poller
tests_test_system_SOURCES = tests/test_system.cpp
@ -573,9 +572,6 @@ tests_test_heartbeats_LDADD = src/libzmq.la
tests_test_stream_exceeds_buffer_SOURCES = tests/test_stream_exceeds_buffer.cpp
tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la
tests_test_thread_safe_polling_SOURCES = tests/test_thread_safe_polling.cpp
tests_test_thread_safe_polling_LDADD = src/libzmq.la
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = src/libzmq.la

View File

@ -1,6 +1,7 @@
# ZeroMQ
[![Build Status](https://travis-ci.org/zeromq/libzmq.png?branch=master)](https://travis-ci.org/zeromq/libzmq)
[![Build status](https://ci.appveyor.com/api/projects/status/e2ks424yrs1un3wt?svg=true)](https://ci.appveyor.com/project/zeromq/libzmq)
## Welcome

85
appveyor.yml Normal file
View File

@ -0,0 +1,85 @@
version: build-{build}
clone_depth: 1
skip_tags: true
os: Visual Studio 2015
environment:
CMAKE_GENERATOR: "Visual Studio 14 2015"
MSVCVERSION: "v140"
MSVCYEAR: "vs2015"
matrix:
- platform: Win32
configuration: Release
WITH_SODIUM: ON
WITH_TWEETNACL: ON
- platform: Win32
configuration: Debug
WITH_SODIUM: ON
WITH_TWEETNACL: ON
- platform: x64
configuration: Release
WITH_SODIUM: ON
WITH_TWEETNACL: ON
- platform: x64
configuration: Debug
WITH_SODIUM: ON
WITH_TWEETNACL: ON
- platform: Win32
configuration: Release
WITH_SODIUM: OFF
WITH_TWEETNACL: OFF
- platform: Win32
configuration: Release
WITH_SODIUM: ON
WITH_TWEETNACL: OFF
- platform: Win32
configuration: Release
WITH_SODIUM: OFF
WITH_TWEETNACL: ON
matrix:
fast_finish: false
init:
#- ps: $blockRdp = $true; iex ((new-object net.webclient).DownloadString('https://raw.githubusercontent.com/appveyor/ci/master/scripts/enable-rdp.ps1'))
- cmake --version
- msbuild /version
- cmd: reg add "HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\Terminal Server\WinStations\RDP-Tcp" /v UserAuthentication /t REG_DWORD /d 0 /f
install:
- cmd: if "%Platform%"=="x64" set "CMAKE_GENERATOR=%CMAKE_GENERATOR% Win64"
- cmd: echo "Generator='%CMAKE_GENERATOR%'"
- cmd: echo "Platform='%Platform%'"
- cmd: set LIBSODIUMDIR=C:\projects\libsodium
- cmd: git clone --depth 1 --quiet "https://github.com/jedisct1/libsodium.git" %LIBSODIUMDIR%
- cmd: msbuild /v:minimal /maxcpucount:%NUMBER_OF_PROCESSORS% /p:Configuration=%Configuration%DLL %LIBSODIUMDIR%\builds\msvc\%MSVCYEAR%\libsodium\libsodium.vcxproj
- cmd: set SODIUM_LIBRARY_DIR="%LIBSODIUMDIR%\bin\%Platform%\%Configuration%\%MSVCVERSION%\dynamic"
- cmd: set SODIUM_INCLUDE_DIR="%LIBSODIUMDIR%\src\libsodium\include"
- cmd: move "%SODIUM_LIBRARY_DIR%\libsodium.lib" "%SODIUM_LIBRARY_DIR%\sodium.lib"
clone_folder: C:\projects\libzmq
before_build:
- cmd: set LIBZMQ_BUILDDIR=C:\projects\build_libzmq
- cmd: md "%LIBZMQ_BUILDDIR%"
- cd "%LIBZMQ_BUILDDIR%"
- cmd: cmake -D CMAKE_INCLUDE_PATH="%SODIUM_INCLUDE_DIR%" -D CMAKE_LIBRARY_PATH="%SODIUM_LIBRARY_DIR%" -D WITH_SODIUM="%WITH_SODIUM%" -D WITH_TWEETNACL="%WITH_TWEETNACL%" -D CMAKE_C_FLAGS_RELEASE="/MT" -D CMAKE_C_FLAGS_DEBUG="/MTd" -D WITH_SODIUM="%WITH_SODIUM%" -G "%CMAKE_GENERATOR%" "%APPVEYOR_BUILD_FOLDER%"
build:
parallel: true
project: C:\projects\build_libzmq\ZeroMQ.sln
verbosity: minimal
after_build:
- cmd: cd %LIBZMQ_BUILDDIR%\bin\%Configuration%"
- cmd: copy "%SODIUM_LIBRARY_DIR%\libsodium.dll" .
- cmd: 7z a -y -bd -mx=9 libzmq.zip *.exe *.dll
- ps: Push-AppveyorArtifact "libzmq.zip" -Filename "libzmq-${env:Platform}-${env:Configuration}.zip"
test_script:
- cmd: cd "%LIBZMQ_BUILDDIR%"
- cmd: ctest -C "%Configuration%" -V

View File

@ -151,6 +151,7 @@
<File RelativePath="..\..\..\..\src\session_base.cpp" />
<File RelativePath="..\..\..\..\src\signaler.cpp" />
<File RelativePath="..\..\..\..\src\socket_base.cpp" />
<File RelativePath="..\..\..\..\src\socket_poller.cpp" />
<File RelativePath="..\..\..\..\src\stream.cpp" />
<File RelativePath="..\..\..\..\src\stream_engine.cpp" />
<File RelativePath="..\..\..\..\src\sub.cpp" />

View File

@ -222,6 +222,7 @@
<ClCompile Include="..\..\..\..\src\session_base.cpp" />
<ClCompile Include="..\..\..\..\src\signaler.cpp" />
<ClCompile Include="..\..\..\..\src\socket_base.cpp" />
<ClCompile Include="..\..\..\..\src\socket_poller.cpp" />
<ClCompile Include="..\..\..\..\src\socks.cpp" />
<ClCompile Include="..\..\..\..\src\socks_connecter.cpp" />
<ClCompile Include="..\..\..\..\src\stream.cpp" />

View File

@ -237,6 +237,9 @@
</ClCompile>
<ClCompile Include="..\..\..\..\src\decoder_allocators.cpp">
<Filter>src</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\socket_poller.cpp">
<Filter>src</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>

View File

@ -222,6 +222,7 @@
<ClCompile Include="..\..\..\..\src\session_base.cpp" />
<ClCompile Include="..\..\..\..\src\signaler.cpp" />
<ClCompile Include="..\..\..\..\src\socket_base.cpp" />
<ClCompile Include="..\..\..\..\src\socket_poller.cpp" />
<ClCompile Include="..\..\..\..\src\socks.cpp" />
<ClCompile Include="..\..\..\..\src\socks_connecter.cpp" />
<ClCompile Include="..\..\..\..\src\stream.cpp" />

View File

@ -237,6 +237,9 @@
</ClCompile>
<ClCompile Include="..\..\..\..\src\decoder_allocators.cpp">
<Filter>src</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\socket_poller.cpp">
<Filter>src</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>

View File

@ -222,6 +222,7 @@
<ClCompile Include="..\..\..\..\src\session_base.cpp" />
<ClCompile Include="..\..\..\..\src\signaler.cpp" />
<ClCompile Include="..\..\..\..\src\socket_base.cpp" />
<ClCompile Include="..\..\..\..\src\socket_poller.cpp" />
<ClCompile Include="..\..\..\..\src\socks.cpp" />
<ClCompile Include="..\..\..\..\src\socks_connecter.cpp" />
<ClCompile Include="..\..\..\..\src\stream.cpp" />

View File

@ -238,6 +238,9 @@
<ClCompile Include="..\..\..\..\src\decoder_allocators.cpp">
<Filter>src</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\socket_poller.cpp">
<Filter>src</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\..\..\include\zmq_utils.h">

View File

@ -383,8 +383,6 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events);
ZMQ_EXPORT int zmq_add_pollfd (void *s, void *p);
ZMQ_EXPORT int zmq_remove_pollfd (void *s, void *p);
/******************************************************************************/
/* I/O multiplexing. */
@ -411,26 +409,12 @@ typedef struct zmq_pollitem_t
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
/* Pollfd polling on thread safe socket */
/******************************************************************************/
ZMQ_EXPORT void *zmq_pollfd_new ();
ZMQ_EXPORT int zmq_pollfd_close (void *p);
ZMQ_EXPORT void zmq_pollfd_recv (void *p);
ZMQ_EXPORT int zmq_pollfd_wait (void *p, int timeout_);
ZMQ_EXPORT int zmq_pollfd_poll (void *p, zmq_pollitem_t *items, int nitems, long timeout);
#if defined _WIN32
ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p);
#else
ZMQ_EXPORT int zmq_pollfd_fd (void *p);
#endif
/******************************************************************************/
/* Poller polling on sockets,fd and threaf safe sockets */
/******************************************************************************/
#define ZMQ_HAVE_POLLER
typedef struct zmq_poller_event_t
{
void *socket;
@ -440,19 +424,23 @@ typedef struct zmq_poller_event_t
int fd;
#endif
void *user_data;
short events;
} zmq_poller_event_t;
ZMQ_EXPORT void *zmq_poller_new ();
ZMQ_EXPORT int zmq_poller_close (void *poller);
ZMQ_EXPORT int zmq_poller_add_socket (void *poller, void *socket, void *user_data);
ZMQ_EXPORT int zmq_poller_remove_socket (void *poller, void *socket);
ZMQ_EXPORT int zmq_poller_add (void *poller, void *socket, void *user_data, short events);
ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events);
ZMQ_EXPORT int zmq_poller_remove (void *poller, void *socket);
ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
#if defined _WIN32
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data);
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
ZMQ_EXPORT int zmq_poller_modify_fd (void *poller, SOCKET fd, short events);
ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, SOCKET fd);
#else
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, int fd, void *user_data);
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, int fd, void *user_data, short events);
ZMQ_EXPORT int zmq_poller_modify_fd (void *poller, int fd, short events);
ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
#endif

View File

@ -100,20 +100,15 @@ static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
unsigned int step_ms = max_ms_ / 10;
if (step_ms < 1)
step_ms = 1;
if (step_ms > 100)
step_ms = 100;
int rc = 0; // do not sleep on first attempt
do
{
if (rc == -1 && errno == EAGAIN)
{
do {
if (rc == -1 && errno == EAGAIN) {
sleep_ms (step_ms);
ms_so_far += step_ms;
}
rc = close (fd_);
} while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
@ -187,7 +182,7 @@ void zmq::signaler_t::send ()
errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
unsigned char dummy = 0;
int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
wsa_assert (nbytes != SOCKET_ERROR);
zmq_assert (nbytes == sizeof (dummy));
#else
@ -304,7 +299,7 @@ void zmq::signaler_t::recv ()
#else
unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
wsa_assert (nbytes != SOCKET_ERROR);
#else
ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
@ -342,7 +337,7 @@ int zmq::signaler_t::recv_failable ()
#else
unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
if (nbytes == SOCKET_ERROR) {
const int last_error = WSAGetLastError();
if (last_error == WSAEWOULDBLOCK) {
@ -466,11 +461,11 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
BOOL so_reuseaddr = 1;
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
(char *)&so_reuseaddr, sizeof so_reuseaddr);
(char *) &so_reuseaddr, sizeof so_reuseaddr);
wsa_assert (rc != SOCKET_ERROR);
BOOL tcp_nodelay = 1;
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
(char *)&tcp_nodelay, sizeof tcp_nodelay);
(char *) &tcp_nodelay, sizeof tcp_nodelay);
wsa_assert (rc != SOCKET_ERROR);
// Init sockaddr to signaler port.
@ -496,12 +491,12 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
}
// Bind listening socket to signaler port.
rc = bind (listener, (const struct sockaddr*) &addr, sizeof addr);
rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
if (rc != SOCKET_ERROR && signaler_port == 0) {
// Retrieve ephemeral port number
int addrlen = sizeof addr;
rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
}
// Listen for incoming connections.
@ -510,12 +505,33 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Connect writer to the listener.
if (rc != SOCKET_ERROR)
rc = connect (*w_, (struct sockaddr*) &addr, sizeof addr);
rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
// Accept connection from writer.
if (rc != SOCKET_ERROR)
*r_ = accept (listener, NULL, NULL);
// Send/receive large chunk to work around TCP slow start
// This code is a workaround for #1608
if (*r_ != INVALID_SOCKET) {
size_t dummy_size = 1024 * 1024; // 1M to overload default receive buffer
unsigned char *dummy = (unsigned char *) malloc (dummy_size);
int still_to_send = (int) dummy_size;
int still_to_recv = (int) dummy_size;
while (still_to_send || still_to_recv) {
int nbytes;
if (still_to_send > 0) {
nbytes = ::send (*w_, (char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
wsa_assert (nbytes != SOCKET_ERROR);
still_to_send -= nbytes;
}
nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
wsa_assert (nbytes != SOCKET_ERROR);
still_to_recv -= nbytes;
}
free (dummy);
}
// Save errno if error occurred in bind/listen/connect/accept.
int saved_errno = 0;
if (*r_ == INVALID_SOCKET)
@ -582,12 +598,12 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
errno_assert (rc != -1);
rc = bind (listener, (struct sockaddr*) &lcladdr, sizeof lcladdr);
rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
errno_assert (rc != -1);
socklen_t lcladdr_len = sizeof lcladdr;
rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
errno_assert (rc != -1);
rc = listen (listener, 1);
@ -602,7 +618,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
errno_assert (rc != -1);
rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof lcladdr);
rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
errno_assert (rc != -1);
*r_ = accept (listener, NULL, NULL);

View File

@ -32,10 +32,13 @@
zmq::socket_poller_t::socket_poller_t () :
tag (0xCAFEBABE),
poll_set (NULL),
poll_events (NULL)
{
pollfd = zmq_pollfd_new ();
need_rebuild (true),
use_signaler (false)
#if defined ZMQ_POLL_BASED_ON_POLL
,
pollfds (NULL)
#endif
{
}
zmq::socket_poller_t::~socket_poller_t ()
@ -43,27 +46,22 @@ zmq::socket_poller_t::~socket_poller_t ()
// Mark the socket_poller as dead
tag = 0xdeadbeef;
for (events_t::iterator it = events.begin(); it != events.end(); ++it) {
for (items_t::iterator it = items.begin(); it != items.end(); ++it) {
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt(it->socket, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
zmq_remove_pollfd(it->socket, pollfd);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
it->socket->remove_signaler (&signaler);
}
}
zmq_pollfd_close (pollfd);
if (poll_set) {
free (poll_set);
poll_set = NULL;
}
if (poll_events) {
free (poll_events);
poll_events = NULL;
#if defined ZMQ_POLL_BASED_ON_POLL
if (pollfds) {
free (pollfds);
pollfds = NULL;
}
#endif
}
bool zmq::socket_poller_t::check_tag ()
@ -71,9 +69,9 @@ bool zmq::socket_poller_t::check_tag ()
return tag == 0xCAFEBABE;
}
int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_)
int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_)
{
for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->socket == socket_) {
errno = EINVAL;
return -1;
@ -82,153 +80,573 @@ int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_)
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (thread_safe) {
if (zmq_add_pollfd (socket_, pollfd) == -1)
if (socket_->add_signaler (&signaler) == -1)
return -1;
}
event_t event = {socket_, 0, user_data_};
events.push_back (event);
item_t item = {socket_, 0, user_data_, events_};
items.push_back (item);
need_rebuild = true;
return 0;
}
#if defined _WIN32
int zmq::socket_poller_t::add_fd (SOCKET fd_, void *user_data_)
#else
int zmq::socket_poller_t::add_fd (int fd_, void *user_data_)
#endif
int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
{
for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (!it->socket && it->fd == fd_) {
errno = EINVAL;
return -1;
}
}
event_t event = {NULL, fd_, user_data_};
events.push_back (event);
item_t item = {NULL, fd_, user_data_, events_};
items.push_back (item);
need_rebuild = true;
return 0;
}
int zmq::socket_poller_t::remove_socket (void* socket_)
int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
{
events_t::iterator it;
items_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
for (it = items.begin (); it != items.end (); ++it) {
if (it->socket == socket_)
break;
}
if (it == events.end()) {
if (it == items.end()) {
errno = EINVAL;
return -1;
}
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
it->events = events_;
need_rebuild = true;
if (thread_safe) {
if (zmq_remove_pollfd (socket_, pollfd) == -1)
return -1;
}
events.erase (it);
need_rebuild = true;
return 0;
}
#if defined _WIN32
int zmq::socket_poller_t::remove_fd (SOCKET fd_)
#else
int zmq::socket_poller_t::remove_fd (int fd_)
#endif
{
events_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
{
items_t::iterator it;
for (it = items.begin (); it != items.end (); ++it) {
if (!it->socket && it->fd == fd_)
break;
}
if (it == events.end()) {
if (it == items.end()) {
errno = EINVAL;
return -1;
}
events.erase (it);
it->events = events_;
need_rebuild = true;
return 0;
}
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_)
int zmq::socket_poller_t::remove (socket_base_t *socket_)
{
if (need_rebuild)
rebuild ();
items_t::iterator it;
int rc = zmq_pollfd_poll (pollfd, poll_set, poll_size, timeout_);
if (rc == -1) {
return rc;
for (it = items.begin (); it != items.end (); ++it) {
if (it->socket == socket_)
break;
}
if (rc == 0) {
errno = EAGAIN;
if (it == items.end()) {
errno = EINVAL;
return -1;
}
int thread_safe;
size_t thread_safe_size = sizeof(int);
for (int i = 0; i < poll_size; i++) {
if (poll_set [i].revents & ZMQ_POLLIN) {
*event_ = poll_events[i];
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
break;
}
if (thread_safe) {
if (socket_->remove_signaler (&signaler) == -1)
return -1;
}
items.erase (it);
need_rebuild = true;
return 0;
}
void zmq::socket_poller_t::rebuild ()
int zmq::socket_poller_t::remove_fd (fd_t fd_)
{
if (poll_set) {
free (poll_set);
poll_set = NULL;
items_t::iterator it;
for (it = items.begin (); it != items.end (); ++it) {
if (!it->socket && it->fd == fd_)
break;
}
if (poll_events) {
free (poll_events);
poll_events = NULL;
if (it == items.end()) {
errno = EINVAL;
return -1;
}
items.erase (it);
need_rebuild = true;
return 0;
}
int zmq::socket_poller_t::rebuild ()
{
#if defined ZMQ_POLL_BASED_ON_POLL
if (pollfds) {
free (pollfds);
pollfds = NULL;
}
poll_size = events.size ();
poll_set = (zmq_pollitem_t*) malloc (poll_size * sizeof (zmq_pollitem_t));
alloc_assert (poll_set);
use_signaler = false;
poll_events = (event_t*) malloc (poll_size * sizeof (event_t));
poll_size = 0;
int event_nbr = 0;
for (events_t::iterator it = events.begin (); it != events.end (); ++it, event_nbr++) {
poll_set [event_nbr].socket = it->socket;
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->events) {
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (!it->socket)
poll_set [event_nbr].fd = it->fd;
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
poll_set [event_nbr].events = ZMQ_POLLIN;
poll_events [event_nbr] = *it;
if (thread_safe) {
if (!use_signaler) {
use_signaler = true;
poll_size++;
}
}
else
poll_size++;
}
else
poll_size++;
}
}
if (poll_size == 0)
return 0;
pollfds = (pollfd*) malloc (poll_size * sizeof (pollfd));
alloc_assert (pollfds);
int item_nbr = 0;
if (use_signaler) {
item_nbr = 1;
pollfds[0].fd = signaler.get_fd();
pollfds[0].events = POLLIN;
}
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->events) {
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (!thread_safe) {
size_t fd_size = sizeof (zmq::fd_t);
if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) {
return -1;
}
pollfds [item_nbr].events = POLLIN;
item_nbr++;
}
}
else {
pollfds [item_nbr].fd = it->fd;
pollfds [item_nbr].events =
(it->events & ZMQ_POLLIN ? POLLIN : 0) |
(it->events & ZMQ_POLLOUT ? POLLOUT : 0) |
(it->events & ZMQ_POLLPRI ? POLLPRI : 0);
it->pollfd_index = item_nbr;
item_nbr++;
}
}
}
#elif defined ZMQ_POLL_BASED_ON_SELECT
FD_ZERO (&pollset_in);
FD_ZERO (&pollset_out);
FD_ZERO (&pollset_err);
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert (items.size () <= FD_SETSIZE);
poll_size = 0;
use_signaler = false;
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (thread_safe && it->events) {
use_signaler = true;
FD_SET (signaler.get_fd (), &pollset_in);
poll_size = 1;
break;
}
}
}
maxfd = 0;
// Build the fd_sets for passing to select ().
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->events) {
// If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option.
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (!thread_safe) {
zmq::fd_t notify_fd;
size_t fd_size = sizeof (zmq::fd_t);
if (it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size) == -1)
return -1;
FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd)
maxfd = notify_fd;
poll_size++;
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// events to the appropriate fd_sets.
else {
if (it->events & ZMQ_POLLIN)
FD_SET (it->fd, &pollset_in);
if (it->events & ZMQ_POLLOUT)
FD_SET (it->fd, &pollset_out);
if (it->events & ZMQ_POLLERR)
FD_SET (it->fd, &pollset_err);
if (maxfd < it->fd)
maxfd = it->fd;
poll_size++;
}
}
}
#endif
need_rebuild = false;
return 0;
}
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_)
{
if (need_rebuild)
if (rebuild () == -1)
return -1;
#if defined ZMQ_POLL_BASED_ON_POLL
if (unlikely (poll_size == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
bool first_pass = true;
while (true) {
// Compute the timeout for the subsequent poll.
int timeout;
if (first_pass)
timeout = 0;
else
if (timeout_ < 0)
timeout = -1;
else
timeout = end - now;
// Wait for events.
while (true) {
int rc = poll (pollfds, poll_size, timeout);
if (rc == -1 && errno == EINTR) {
return -1;
}
errno_assert (rc >= 0);
break;
}
// Receive the signal from pollfd
if (use_signaler && pollfds[0].revents & POLLIN)
signaler.recv ();
// Check for the events.
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (it->socket) {
size_t events_size = sizeof (uint32_t);
uint32_t events;
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) {
return -1;
}
if (it->events & events) {
event_->socket = it->socket;
event_->user_data = it->user_data;
event_->events = it->events & events;
// If there is event to return, we can exit immediately.
return 0;
}
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
short revents = pollfds [it->pollfd_index].revents;
short events = 0;
if (revents & POLLIN)
events |= ZMQ_POLLIN;
if (revents & POLLOUT)
events |= ZMQ_POLLOUT;
if (revents & POLLPRI)
events |= ZMQ_POLLPRI;
if (revents & ~(POLLIN | POLLOUT | POLLPRI))
events |= ZMQ_POLLERR;
if (events) {
event_->socket = NULL;
event_->user_data = it->user_data;
event_->fd = it->fd;
event_->events = events;
// If there is event to return, we can exit immediately.
return 0;
}
}
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
errno = ETIMEDOUT;
return -1;
#elif defined ZMQ_POLL_BASED_ON_SELECT
if (unlikely (poll_size == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
bool first_pass = true;
fd_set inset, outset, errset;
while (true) {
// Compute the timeout for the subsequent poll.
timeval timeout;
timeval *ptimeout;
if (first_pass) {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
ptimeout = &timeout;
}
else
if (timeout_ < 0)
ptimeout = NULL;
else {
timeout.tv_sec = (long) ((end - now) / 1000);
timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
ptimeout = &timeout;
}
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
#if defined ZMQ_HAVE_WINDOWS
int rc = select (0, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == SOCKET_ERROR)) {
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
wsa_assert (errno == ENOTSOCK);
return -1;
}
#else
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF);
return -1;
}
#endif
break;
}
if (use_signaler && FD_ISSET (signaler.get_fd (), &inset))
signaler.recv ();
// Check for the events.
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (it->socket) {
size_t events_size = sizeof (uint32_t);
uint32_t events;
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1)
return -1;
if (it->events & events) {
event_->socket = it->socket;
event_->user_data = it->user_data;
event_->events = it->events & events;
// If there is event to return, we can exit immediately.
return 0;
}
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
short events = 0;
if (FD_ISSET (it->fd, &inset))
events |= ZMQ_POLLIN;
if (FD_ISSET (it->fd, &outset))
events |= ZMQ_POLLOUT;
if (FD_ISSET (it->fd, &errset))
events |= ZMQ_POLLERR;
if (events) {
event_->socket = NULL;
event_->user_data = it->user_data;
event_->fd = it->fd;
event_->events = events;
// If there is event to return, we can exit immediately.
return 0;
}
}
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
errno = ETIMEDOUT;
return -1;
#else
// Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
return -1;
#endif
}

View File

@ -30,10 +30,23 @@
#ifndef __ZMQ_SOCKET_POLLER_HPP_INCLUDED__
#define __ZMQ_SOCKET_POLLER_HPP_INCLUDED__
#include "poller.hpp"
#if defined ZMQ_POLL_BASED_ON_POLL
#include <poll.h>
#endif
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#endif
#include <vector>
#include <algorithm>
#include "../include/zmq.h"
#include "socket_base.hpp"
#include "signaler.hpp"
namespace zmq
{
@ -46,24 +59,19 @@ namespace zmq
typedef struct event_t
{
void *socket;
#if defined _WIN32
SOCKET fd;
#else
int fd;
#endif
socket_base_t *socket;
fd_t fd;
void *user_data;
short events;
} event_t;
int add_socket (void *socket, void *user_data);
int remove_socket (void *socket);
#if defined _WIN32
int add_fd (SOCKET fd, void *user_data);
int remove_fd (SOCKET fd);
#else
int add_fd (int fd, void *user_data);
int remove_fd (int fd);
#endif
int add (socket_base_t *socket, void *user_data, short events);
int modify (socket_base_t *socket, short events);
int remove (socket_base_t *socket);
int add_fd (fd_t fd, void *user_data, short events);
int modify_fd (fd_t fd, short events);
int remove_fd (fd_t fd);
int wait (event_t *event, long timeout);
@ -71,29 +79,45 @@ namespace zmq
bool check_tag ();
private:
void rebuild ();
int rebuild ();
// Used to check whether the object is a socket_poller.
uint32_t tag;
// Pollfd used for thread safe sockets polling
void *pollfd;
// Signaler used for thread safe sockets polling
signaler_t signaler;
typedef struct item_t {
socket_base_t *socket;
fd_t fd;
void *user_data;
short events;
#if defined ZMQ_POLL_BASED_ON_POLL
int pollfd_index;
#endif
} item_t;
// List of sockets
typedef std::vector <event_t> events_t;
events_t events;
// Current zmq_poll set
zmq_pollitem_t *poll_set;
// Matching set to events
event_t *poll_events;
// Size of the pollset
int poll_size;
typedef std::vector <item_t> items_t;
items_t items;
// Does the pollset needs rebuilding?
bool need_rebuild;
// Should the signaler be used for the thread safe polling?
bool use_signaler;
// Size of the pollset
int poll_size;
#if defined ZMQ_POLL_BASED_ON_POLL
pollfd *pollfds;
#elif defined ZMQ_POLL_BASED_ON_SELECT
fd_set pollset_in;
fd_set pollset_out;
fd_set pollset_err;
zmq::fd_t maxfd;
#endif
socket_poller_t (const socket_poller_t&);
const socket_poller_t &operator = (const socket_poller_t&);

View File

@ -218,7 +218,6 @@ void zmq::tune_tcp_retransmit_timeout (fd_t sockfd_, int timeout_)
&& errno != EBADF
&& errno != EDESTADDRREQ
&& errno != EFAULT
&& errno != EINVAL
&& errno != EISCONN
&& errno != EMSGSIZE
&& errno != ENOMEM
@ -240,21 +239,21 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
// If not a single byte can be read from the socket in non-blocking mode
// we'll get an error (this may happen during the speculative read).
if (rc == SOCKET_ERROR) {
const int last_error = WSAGetLastError();
if (last_error == WSAEWOULDBLOCK) {
errno = EAGAIN;
}
else {
wsa_assert (last_error == WSAENETDOWN ||
last_error == WSAENETRESET ||
last_error == WSAECONNABORTED ||
last_error == WSAETIMEDOUT ||
last_error == WSAECONNRESET ||
last_error == WSAECONNREFUSED ||
last_error == WSAENOTCONN);
errno = wsa_error_to_errno (last_error);
}
if (rc == SOCKET_ERROR) {
const int last_error = WSAGetLastError();
if (last_error == WSAEWOULDBLOCK) {
errno = EAGAIN;
}
else {
wsa_assert (last_error == WSAENETDOWN ||
last_error == WSAENETRESET ||
last_error == WSAECONNABORTED ||
last_error == WSAETIMEDOUT ||
last_error == WSAECONNRESET ||
last_error == WSAECONNREFUSED ||
last_error == WSAENOTCONN);
errno = wsa_error_to_errno (last_error);
}
}
return rc == SOCKET_ERROR ? -1 : rc;
@ -269,7 +268,6 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
if (rc == -1) {
errno_assert (errno != EBADF
&& errno != EFAULT
&& errno != EINVAL
&& errno != ENOMEM
&& errno != ENOTSOCK);
if (errno == EWOULDBLOCK || errno == EINTR)

View File

@ -445,7 +445,7 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_, boo
// Resolve the IP address.
int rc;
if (local_)
if (local_ || is_src_)
rc = resolve_interface (addr_str.c_str (), ipv6_, is_src_);
else
rc = resolve_hostname (addr_str.c_str (), ipv6_, is_src_);

View File

@ -565,34 +565,6 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
return nread;
}
// Add/remove pollfd from a socket
int zmq_add_pollfd (void *s_, void *p_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
zmq::signaler_t *p = (zmq::signaler_t *) p_;
return s->add_signaler(p);
}
int zmq_remove_pollfd (void *s_, void *p_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
zmq::signaler_t *p = (zmq::signaler_t *) p_;
return s->remove_signaler(p);
}
// Message manipulators.
int zmq_msg_init (zmq_msg_t *msg_)
@ -1070,495 +1042,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#endif
}
// Create pollfd
void *zmq_pollfd_new ()
{
return new zmq::signaler_t ();
}
// Close pollfd
int zmq_pollfd_close (void* p_)
{
zmq::signaler_t *s = (zmq::signaler_t*)p_;
LIBZMQ_DELETE(s);
return 0;
}
// Recv signal from pollfd
void zmq_pollfd_recv(void *p_)
{
zmq::signaler_t *s = (zmq::signaler_t*)p_;
s->recv ();
}
// Wait until pollfd is signalled
int zmq_pollfd_wait(void *p_, int timeout_)
{
zmq::signaler_t *s = (zmq::signaler_t*)p_;
return s->wait (timeout_);
}
// Get pollfd fd
#if defined _WIN32
SOCKET zmq_pollfd_fd (void *p_)
#else
int zmq_pollfd_fd (void *p_)
#endif
{
zmq::signaler_t *s = (zmq::signaler_t*)p_;
return s->get_fd ();
}
// Polling thread safe sockets version
int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
}
if (unlikely (nitems_ == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
if (!items_) {
errno = EFAULT;
return -1;
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
pollfd spollfds[ZMQ_POLLITEMS_DFLT];
pollfd *pollfds = spollfds;
int pollfds_size = 0;
int pollfds_index = 0;
bool use_pollfd = false;
for (int i = 0; i != nitems_; i++) {
if (items_ [i].socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1) {
return -1;
}
// All thread safe sockets share same fd
if (thread_safe) {
// if poll fd is not set yet and events are set for this socket
if (!use_pollfd && items_ [i].events) {
use_pollfd = true;
pollfds_size++;
}
}
else
pollfds_size++;
}
else
pollfds_size++;
}
if (pollfds_size > ZMQ_POLLITEMS_DFLT) {
pollfds = (pollfd*) malloc (pollfds_size * sizeof (pollfd));
alloc_assert (pollfds);
}
// If we have at least one thread safe socket we set pollfd first
if (use_pollfd) {
pollfds [0].fd = zmq_pollfd_fd (p_);
pollfds [0].events = POLLIN;
pollfds_index = 1;
}
// Build pollset for poll () system call.
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket, we poll on the file descriptor
// retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
// We already handled the thread safe sockets
if (!thread_safe) {
size_t zmq_fd_size = sizeof (zmq::fd_t);
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [pollfds_index].fd,
&zmq_fd_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
pollfds [pollfds_index].events = items_ [i].events ? POLLIN : 0;
pollfds_index++;
}
}
// Else, the poll item is a raw file descriptor. Just convert the
// events to normal POLLIN/POLLOUT for poll ().
else {
pollfds [pollfds_index].fd = items_ [i].fd;
pollfds [pollfds_index].events =
(items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
(items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
(items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0);
pollfds_index++;
}
}
bool first_pass = true;
int nevents = 0;
while (true) {
// Compute the timeout for the subsequent poll.
int timeout;
if (first_pass)
timeout = 0;
else
if (timeout_ < 0)
timeout = -1;
else
timeout = end - now;
// Wait for events.
while (true) {
int rc = poll (pollfds, pollfds_size, timeout);
if (rc == -1 && errno == EINTR) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
errno_assert (rc >= 0);
break;
}
// Receive the signal from pollfd
if (use_pollfd && pollfds[0].revents & POLLIN)
zmq_pollfd_recv (p_);
// Check for the events.
for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (pollfds [i].revents & POLLIN)
items_ [i].revents |= ZMQ_POLLIN;
if (pollfds [i].revents & POLLOUT)
items_ [i].revents |= ZMQ_POLLOUT;
if (pollfds [i].revents & POLLPRI)
items_ [i].revents |= ZMQ_POLLPRI;
if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
items_ [i].revents |= ZMQ_POLLERR;
}
if (items_ [i].revents)
nevents++;
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// If there are events to return, we can exit immediately.
if (nevents)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
if (pollfds != spollfds)
free (pollfds);
return nevents;
#elif defined ZMQ_POLL_BASED_ON_SELECT
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
}
if (unlikely (nitems_ == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert (nitems_ <= FD_SETSIZE);
fd_set pollset_in;
FD_ZERO (&pollset_in);
fd_set pollset_out;
FD_ZERO (&pollset_out);
fd_set pollset_err;
FD_ZERO (&pollset_err);
bool use_pollfd = false;
for (int i = 0; i != nitems_; i++) {
if (items_ [i].socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1)
return -1;
if (thread_safe && items_ [i].events) {
use_pollfd = true;
FD_SET (zmq_pollfd_fd (p_), &pollset_in);
break;
}
}
}
zmq::fd_t maxfd = 0;
// Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1)
return -1;
if (!thread_safe) {
zmq::fd_t notify_fd;
size_t zmq_fd_size = sizeof (zmq::fd_t);
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
&zmq_fd_size) == -1)
return -1;
if (items_ [i].events) {
FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd)
maxfd = notify_fd;
}
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// events to the appropriate fd_sets.
else {
if (items_ [i].events & ZMQ_POLLIN)
FD_SET (items_ [i].fd, &pollset_in);
if (items_ [i].events & ZMQ_POLLOUT)
FD_SET (items_ [i].fd, &pollset_out);
if (items_ [i].events & ZMQ_POLLERR)
FD_SET (items_ [i].fd, &pollset_err);
if (maxfd < items_ [i].fd)
maxfd = items_ [i].fd;
}
}
bool first_pass = true;
int nevents = 0;
fd_set inset, outset, errset;
while (true) {
// Compute the timeout for the subsequent poll.
timeval timeout;
timeval *ptimeout;
if (first_pass) {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
ptimeout = &timeout;
}
else
if (timeout_ < 0)
ptimeout = NULL;
else {
timeout.tv_sec = (long) ((end - now) / 1000);
timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
ptimeout = &timeout;
}
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
#if defined ZMQ_HAVE_WINDOWS
int rc = select (0, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == SOCKET_ERROR)) {
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
wsa_assert (errno == ENOTSOCK);
return -1;
}
#else
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF);
return -1;
}
#endif
break;
}
if (use_pollfd && FD_ISSET (zmq_pollfd_fd (p_), &inset))
zmq_pollfd_recv (p_);
// Check for the events.
for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1)
return -1;
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLIN;
if (FD_ISSET (items_ [i].fd, &outset))
items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset))
items_ [i].revents |= ZMQ_POLLERR;
}
if (items_ [i].revents)
nevents++;
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// If there are events to return, we can exit immediately.
if (nevents)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
return nevents;
#else
// Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
return -1;
#endif
}
// The poller functionality
void* zmq_poller_new ()
@ -1579,20 +1062,26 @@ int zmq_poller_close (void *poller_)
return 0;
}
int zmq_poller_add_socket (void *poller_, void *socket_, void *user_data_)
int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->add_socket (socket_, user_data_);
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
return ((zmq::socket_poller_t*)poller_)->add (socket, user_data_, events_);
}
#if defined _WIN32
int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_)
int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_, short events_)
#else
int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_)
int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_, short events_)
#endif
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
@ -1600,17 +1089,56 @@ int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_)
return -1;
}
return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_);
return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_, events_);
}
int zmq_poller_remove_socket (void *poller_, void *socket)
int zmq_poller_modify (void *poller_, void *s_, short events_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
return ((zmq::socket_poller_t*)poller_)->modify (socket, events_);
}
#if defined _WIN32
int zmq_poller_modify_fd (void *poller_, SOCKET fd_, short events_)
#else
int zmq_poller_modify_fd (void *poller_, int fd_, short events_)
#endif
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->modify_fd (fd_, events_);
}
int zmq_poller_remove (void *poller_, void *s_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->remove_socket (socket);
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
return ((zmq::socket_poller_t*)poller_)->remove (socket);
}
#if defined _WIN32
@ -1642,6 +1170,7 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
event->socket = e.socket;
event->fd = e.fd;
event->user_data = e.user_data;
event->events = e.events;
return rc;
}

View File

@ -50,10 +50,24 @@ set(tests
test_pub_invert_matching
test_thread_safe
test_client_server
test_sockopt_hwm
test_sockopt_hwm
test_heartbeats
test_thread_safe_polling
test_poller
test_atomics
test_bind_src_address
test_capabilities
test_ipc_wildcard
test_metadata
test_pair_tipc
test_reqrep_device_tipc
test_reqrep_tipc
test_router_handover
test_router_mandatory_tipc
test_srcfd
test_stream_timeout
test_sub_forward_tipc
test_xpub_manual
test_xpub_welcome_msg
)
if(NOT WIN32)
list(APPEND tests
@ -66,6 +80,11 @@ if(NOT WIN32)
test_proxy_terminate
test_getsockopt_memset
test_filter_ipc
test_connect_delay_tipc
test_shutdown_stress_tipc
test_stream_exceeds_buffer
test_router_mandatory_hwm
test_term_endpoint_tipc
)
if(HAVE_FORK)
list(APPEND tests test_fork)
@ -84,6 +103,7 @@ foreach(test ${tests})
else()
add_test(NAME ${test} COMMAND ${test})
endif()
set_tests_properties(${test} PROPERTIES TIMEOUT 10)
endforeach()
if(NOT WIN32)
@ -92,3 +112,13 @@ if(NOT WIN32)
endif()
endif()
#Check whether all tests in the current folder are present
file(READ "${CMAKE_CURRENT_LIST_FILE}" CURRENT_LIST_FILE_CONTENT)
file(GLOB ALL_TEST_SOURCES "test_*.cpp")
foreach(TEST_SOURCE ${ALL_TEST_SOURCES})
get_filename_component(TESTNAME "${TEST_SOURCE}" NAME_WE)
string(REGEX MATCH "${TESTNAME}" MATCH_TESTNAME "${CURRENT_LIST_FILE_CONTENT}")
if (NOT MATCH_TESTNAME)
message(AUTHOR_WARNING "Test '${TESTNAME}' is not known to CTest.")
endif()
endforeach()

View File

@ -61,13 +61,13 @@ int main (void)
// Set up poller
void* poller = zmq_poller_new ();
rc = zmq_poller_add_socket (poller, sink, sink);
rc = zmq_poller_add (poller, sink, sink, ZMQ_POLLIN);
assert (rc == 0);
// Send a message
char data[1] = {'H'};
rc = zmq_send_const (vent, data, 1, 0);
assert (rc == 1);
assert (rc == 1);
// We expect a message only on the sink
zmq_poller_event_t event;
@ -77,9 +77,14 @@ int main (void)
assert (event.user_data == sink);
rc = zmq_recv (sink, data, 1, 0);
assert (rc == 1);
// We expect timed out
rc = zmq_poller_wait (poller, &event, 0);
assert (rc == -1);
assert (errno == ETIMEDOUT);
// Stop polling sink
rc = zmq_poller_remove_socket (poller, sink);
rc = zmq_poller_remove (poller, sink);
assert (rc == 0);
// Check we can poll an FD
@ -96,7 +101,7 @@ int main (void)
rc = zmq_getsockopt (bowl, ZMQ_FD, &fd, &fd_size);
assert (rc == 0);
rc = zmq_poller_add_fd (poller, fd, bowl);
rc = zmq_poller_add_fd (poller, fd, bowl, ZMQ_POLLIN);
assert (rc == 0);
rc = zmq_poller_wait (poller, &event, 500);
assert (rc == 0);
@ -106,7 +111,8 @@ int main (void)
zmq_poller_remove_fd (poller, fd);
// Polling on thread safe sockets
zmq_poller_add_socket (poller, server, NULL);
rc = zmq_poller_add (poller, server, NULL, ZMQ_POLLIN);
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:55557");
assert (rc == 0);
rc = zmq_send_const (client, data, 1, 0);
@ -116,7 +122,16 @@ int main (void)
assert (event.socket == server);
assert (event.user_data == NULL);
rc = zmq_recv (server, data, 1, 0);
assert (rc == 1);
assert (rc == 1);
// Polling on pollout
rc = zmq_poller_modify (poller, server, ZMQ_POLLOUT | ZMQ_POLLIN);
assert (rc == 0);
rc = zmq_poller_wait (poller, &event, 0);
assert (rc == 0);
assert (event.socket == server);
assert (event.user_data == NULL);
assert (event.events == ZMQ_POLLOUT);
// Destory poller, sockets and ctx
rc = zmq_poller_close (poller);
@ -131,7 +146,7 @@ int main (void)
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_shutdown (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;

View File

@ -1,164 +0,0 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
void worker(void* s);
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server = zmq_socket (ctx, ZMQ_SERVER);
void *server2 = zmq_socket (ctx, ZMQ_SERVER);
void *pollfd = zmq_pollfd_new ();
int rc;
rc = zmq_add_pollfd (server, pollfd);
assert (rc == 0);
rc = zmq_add_pollfd (server2, pollfd);
assert (rc == 0);
zmq_pollitem_t items[] = {
{server, 0, ZMQ_POLLIN, 0},
{server2, 0, ZMQ_POLLIN, 0}};
rc = zmq_bind (server, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_bind (server2, "tcp://127.0.0.1:5561");
assert (rc == 0);
void* t = zmq_threadstart(worker, ctx);
assert (rc == 0);
rc = zmq_pollfd_poll (pollfd, items, 2, -1);
assert (rc == 1);
assert (items[0].revents == ZMQ_POLLIN);
assert (items[1].revents == 0);
zmq_msg_t msg;
rc = zmq_msg_init(&msg);
rc = zmq_msg_recv(&msg, server, ZMQ_DONTWAIT);
assert (rc == 1);
rc = zmq_pollfd_poll (pollfd, items, 2, -1);
assert (rc == 1);
assert (items[0].revents == 0);
assert (items[1].revents == ZMQ_POLLIN);
rc = zmq_msg_recv(&msg, server2, ZMQ_DONTWAIT);
assert (rc == 1);
rc = zmq_pollfd_poll (pollfd, items, 2, 0);
assert (rc == 0);
assert (items[0].revents == 0);
assert (items[1].revents == 0);
zmq_threadclose(t);
rc = zmq_msg_close(&msg);
assert (rc == 0);
rc = zmq_remove_pollfd (server, pollfd);
assert (rc == 0);
rc = zmq_remove_pollfd (server2, pollfd);
assert (rc == 0);
rc = zmq_pollfd_close (pollfd);
assert (rc == 0);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (server2);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
void worker(void* ctx)
{
void *client = zmq_socket (ctx, ZMQ_CLIENT);
int rc = zmq_connect (client, "tcp://127.0.0.1:5560");
assert (rc == 0);
msleep(100);
zmq_msg_t msg;
rc = zmq_msg_init_size(&msg,1);
assert (rc == 0);
char * data = (char *)zmq_msg_data(&msg);
data[0] = 1;
rc = zmq_msg_send(&msg, client, 0);
assert (rc == 1);
rc = zmq_disconnect (client, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:5561");
assert (rc == 0);
msleep(100);
rc = zmq_msg_close(&msg);
assert (rc == 0);
rc = zmq_msg_init_size(&msg,1);
assert (rc == 0);
data = (char *)zmq_msg_data(&msg);
data[0] = 1;
rc = zmq_msg_send(&msg, client, 0);
assert (rc == 1);
rc = zmq_msg_close(&msg);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
}

View File

@ -1,25 +0,0 @@
/*
randombytes/devurandom.h version 20080713
D. J. Bernstein
Public domain.
*/
#ifndef randombytes_devurandom_H
#define randombytes_devurandom_H
#ifdef __cplusplus
extern "C" {
#endif
extern void randombytes(unsigned char *,unsigned long long);
extern int randombytes_close(void);
#ifdef __cplusplus
}
#endif
#ifndef randombytes_implementation
#define randombytes_implementation "devurandom"
#endif
#endif

View File

@ -1,5 +1,21 @@
/*
randombytes/randombytes.h version 20080713
D. J. Bernstein
Public domain.
*/
#ifndef randombytes_H
#define randombytes_H
#include "devurandom.h"
#ifdef __cplusplus
extern "C" {
#endif
extern void randombytes(unsigned char *,unsigned long long);
extern int randombytes_close(void);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,43 @@
#include <windows.h>
#include <WinCrypt.h>
#define NCP ((HCRYPTPROV) 0)
HCRYPTPROV hProvider = NCP;
void randombytes(unsigned char *x,unsigned long long xlen)
{
unsigned i;
BOOL ret;
if (hProvider == NCP) {
for(;;) {
ret = CryptAcquireContext(&hProvider, NULL, NULL, PROV_RSA_FULL, CRYPT_VERIFYCONTEXT | CRYPT_SILENT);
if (ret != FALSE) break;
Sleep(1);
}
}
while (xlen > 0) {
if (xlen < 1048576) i = (unsigned) xlen; else i = 1048576;
ret = CryptGenRandom(hProvider, i, x);
if (ret == FALSE) {
Sleep(1);
continue;
}
x += i;
xlen -= i;
}
}
int randombytes_close(void)
{
int rc = -1;
if((hProvider != NCP) && (CryptReleaseContext(hProvider, 0) != FALSE)) {
hProvider = NCP;
rc = 0;
}
return rc;
}