Compare commits

...

126 Commits

Author SHA1 Message Date
Pieter Hintjens
78b741bd71 Merge pull request #36 from hintjens/master
Updated version number for 4.0.2
2013-11-24 03:47:58 -08:00
Pieter Hintjens
ba7c065c3d Updated version number for 4.0.2
- also moved macros to top of zmq.h for better visibility
2013-11-24 12:45:47 +01:00
Pieter Hintjens
d743ddafda Merge pull request #35 from hintjens/master
Updated NEWS for release 4.0.2
2013-11-24 03:42:01 -08:00
Pieter Hintjens
84c0caf42e Updated NEWS for release 4.0.2 2013-11-24 12:41:20 +01:00
Pieter Hintjens
e1939155ff Merge pull request #34 from hintjens/master
Backporting fixes from master
2013-11-19 02:01:23 -08:00
Pieter Hintjens
9ece2d322a Added links to RFCs for socket patterns 2013-11-19 11:00:45 +01:00
KIU Shueng Chuan
589bf436dd remove all asserts during critical section
the size of the critical section is reduced by only entering the critical
section right before the bind().
2013-11-19 11:00:39 +01:00
KIU Shueng Chuan
e54e55835e return error to caller on bind error 2013-11-19 11:00:28 +01:00
Richard Newton
111c20dc22 Fix signed/unsigned comparison 2013-11-19 11:00:07 +01:00
Richard Newton
3a4896f4a3 Fix signed/unsigned comparison 2013-11-19 10:59:57 +01:00
Richard Newton
0855c28069 Reduce default maximum number of sockets by 1 so there is room for the reaper socket. 2013-11-19 10:58:34 +01:00
Richard Newton
737b6afc97 Revert "Make FD_SETSIZE = ZMQ_MAX_SOCKETS_DFLT + 1 so there is room for the repear socket."
This reverts commit 0b92831b2a82c3bbc0e9371662c546b94915a3ea.
2013-11-19 10:58:28 +01:00
psl-felipefarinon
8f85bafe37 issue #583 removing C++11 code. 2013-11-19 10:58:20 +01:00
Bruno D. Rodrigues
f72dbb35d5 Fix socket creation above sistem limits for all 'other' OS not covered by eventfd, windows or vms; enhanced test to create sockets up to a bigger limit to really test hitting the OS limit 2013-11-19 10:55:48 +01:00
Pieter Hintjens
5d6e7a74c1 Added test case to ignore 2013-11-09 09:59:21 +01:00
Pieter Hintjens
30309d660e Merge pull request #32 from hintjens/master
Cherry picking changes from libzmq master
2013-11-09 00:56:52 -08:00
Pieter Hintjens
e0c8a112a4 Cherry picking fixes from master 2013-11-09 09:56:21 +01:00
Martin Hurton
f6293d257d Signal that the peer performed orderly shutdown 2013-11-09 09:52:09 +01:00
Richard Newton
f33bdcf02e Fix formatting. 2013-11-09 09:52:08 +01:00
Richard Newton
fef24a8c1f Make FD_SETSIZE = ZMQ_MAX_SOCKETS_DFLT + 1 so there is room for the repear socket. 2013-11-09 09:52:05 +01:00
Richard Newton
bfc3deb43a Get maximum number of sockets it can handle from poller_t 2013-11-09 09:52:02 +01:00
psl-felipefarinon
dccf1dce1e Fixing broken build issue #583 2013-11-09 09:51:59 +01:00
Richard Newton
24311dee1b Add test case for many sockets
Conflicts:
	CMakeLists.txt
	tests/Makefile.am
2013-11-09 09:50:43 +01:00
psl-felipefarinon
27d20aacc7 Fixing issue #583. Using low resolution timer for clock::now_ms 2013-11-09 09:47:41 +01:00
Pieter Hintjens
b2698474d2 Simplified fdpair reset after fork 2013-11-09 09:47:40 +01:00
Pieter Hintjens
0465e9abe9 Simplified error handling for make_fdpair on Windows 2013-11-09 09:47:04 +01:00
Pieter Hintjens
885c816e57 Fix for issue 574 2013-11-09 09:47:02 +01:00
Pieter Hintjens
7f3d0995cd Removed over-long pauses in tests
- used msleep (10) in most places instead of zmq_sleep (1)
- may cause failures on slower machines
- to change, modify SETTLE_TIME in testutil.h
- tested down to 1 msec on fast boxes

Conflicts:
	tests/test_connect_delay_tipc.cpp
	tests/test_proxy.cpp
	tests/test_sub_forward_tipc.cpp
	tests/test_term_endpoint_tipc.cpp
	tests/testutil.hpp
2013-11-09 09:44:17 +01:00
Pieter Hintjens
f745e4ce64 Merge pull request #31 from hintjens/master
Backported fixes for libzmq-39 and other patches.
2013-11-06 11:26:05 -08:00
Richard Newton
de239f358e Fix race condition on shutdown 2013-11-06 20:16:03 +01:00
Richard Newton
30f470eff5 Fix test warning. 2013-11-06 20:16:03 +01:00
Richard Newton
f0b69bba28 Fix test warning. 2013-11-06 20:16:03 +01:00
MinRK
60032ef330 test zmq_msg_close after sending empty message on ROUTER_RAW 2013-11-06 20:16:03 +01:00
MinRK
c663f37761 add missing msg->init for ROUTER_RAW with empty message 2013-11-06 20:16:03 +01:00
Pieter Hintjens
806f57e7f6 Merge pull request #30 from hintjens/master
Cherry picking fixes from libzmq
2013-11-04 08:44:38 -08:00
Pieter Hintjens
b22c2e4f8a Updated NEWS 2013-11-04 17:37:15 +01:00
Denis Mingulov
54a3ebcac6 Doc examples with zmq_msg_close usage - parameter is 'zmq_msg_t *'
Documentation examples for zmq_msg_get and zmq_msg_more functions have an
incorrect call to zmq_msg_close function - with 'zmq_msg_t' as a parameter
despite 'zmq_msg_t *' is required, so it is impossible to compile these
examples properly.

Also for zmq_msg_get example - declaration of zmq_msg_t variable is added
(like it is done in other examples).
2013-11-04 17:35:47 +01:00
Pieter Hintjens
aede37e3e5 Fixed issue 578 - corrected type usage 2013-11-04 17:34:54 +01:00
Pieter Hintjens
2949f2dbc5 Fixed issue 578 2013-11-04 17:34:14 +01:00
Pieter Hintjens
28b7c991af Merge pull request #28 from hintjens/master
Fixed configure error on non-Linux boxes
2013-10-28 03:58:52 -07:00
Pieter Hintjens
1c70e91c6f Fixed configure error on non-Linux boxes 2013-10-28 11:58:29 +01:00
Pieter Hintjens
c85fbaf22e Merge pull request #27 from hintjens/master
Fixed ref to zmq_socket_monitor
2013-10-28 03:49:45 -07:00
Pieter Hintjens
b58055794b Fixed ref to zmq_socket_monitor 2013-10-28 11:49:23 +01:00
Pieter Hintjens
cb3c82271a Merge pull request #26 from hintjens/master
Backporting fixes from master
2013-10-25 01:04:06 -07:00
Pieter Hintjens
9bf10a83a1 Updated NEWS 2013-10-25 10:03:42 +02:00
Pieter Hintjens
8efd7affc6 test_stream failed when response was broken into frames 2013-10-25 09:56:04 +02:00
Pieter Hintjens
bf97ea8ed8 Merge pull request #25 from hintjens/master
Clarified use of secret/public keys
2013-10-23 07:04:20 -07:00
Pieter Hintjens
cd2afebd0a Clarified use of secret/public keys 2013-10-23 16:03:45 +02:00
Pieter Hintjens
2b8d86c24a Merge pull request #24 from hintjens/master
Fixed man page for CURVE key options
2013-10-23 03:05:44 -07:00
Pieter Hintjens
bd411bbf11 Fixed man page for CURVE key options 2013-10-23 12:04:57 +02:00
Pieter Hintjens
573d7b0c0b Merge pull request #23 from hintjens/master
Backporting LIBZMQ-569 fix from master
2013-10-13 23:05:54 -07:00
Pieter Hintjens
dcb9312ba6 Updated NEWS for backport 2013-10-14 08:04:57 +02:00
Martin Hurton
5490794666 Load identity message to decoder at start 2013-10-14 08:03:36 +02:00
Pieter Hintjens
07d7cf69d0 Merge pull request #22 from hintjens/master
Temporary workaround for broken libsodium install
2013-10-13 05:47:11 -07:00
Pieter Hintjens
b24db36057 Temporary workaround for broken libsodium install 2013-10-13 14:46:08 +02:00
Pieter Hintjens
4f2ac39d28 Merge pull request #21 from hintjens/master
Backporting fixes from master
2013-10-11 03:12:27 -07:00
Pieter Hintjens
a32c02ae47 Identity can only be set on sockets that can connect to ROUTER 2013-10-11 11:47:33 +02:00
Martin Hurton
622e3b5476 Rename engine's methods to improve code readability 2013-10-11 11:47:12 +02:00
xantares
6c036b39ae Fixed out-of-source build 2013-10-11 11:46:50 +02:00
Pieter Hintjens
8cdff6fa2e Fixed NEWS 2013-10-08 17:54:41 +02:00
Pieter Hintjens
f8cebb460a Revert "replace macro constants with enum types for user facing constants"
This reverts commit 4fb74539250d83e82e40e85d39e2e756eccdb8a3.
2013-10-08 15:02:58 +02:00
Pieter Hintjens
d62e7a0734 Revert "distinguish between options and defaults"
This reverts commit e7db680f5b709ce821f79b1e8956f4cbd1931c11.
2013-10-08 15:02:47 +02:00
Pieter Hintjens
a44bd65d03 Fixed NEWS for 4.0.1 release 2013-10-08 10:46:47 +02:00
Pieter Hintjens
437e4070a3 Merge pull request #19 from hintjens/master
Updated NEWS for 4.0.1
2013-10-08 01:36:58 -07:00
Pieter Hintjens
0bfcd4da2f Updated NEWS for 4.0.1 2013-10-08 10:33:50 +02:00
Pieter Hintjens
c852620f5f Merge pull request #18 from hintjens/master
Cherry picking changes from libzmq master
2013-10-08 00:20:40 -07:00
Volodymyr Korniichuk
ca122e9d01 Typo in type name "emTPy_slots_t"
https://zeromq.jira.com/browse/LIBZMQ-565
2013-10-08 09:20:15 +02:00
Dylan Cali
e7db680f5b distinguish between options and defaults 2013-10-08 09:19:55 +02:00
Dylan Cali
4fb7453925 replace macro constants with enum types for user facing constants 2013-10-08 09:19:49 +02:00
Brandon Carpenter
813166019e Add tests/test_abstract_ipc to .gitignore. 2013-10-07 20:53:20 +02:00
Brandon Carpenter
127cd7585a Fix detection of abstract ipc pathname and length calculation.
Abstract socket pathnames must have a NULL character in the first
position, but the second character must also be checked to differentiate
an abstract name from the empty string.  The address length must also
indicate the length of the pathname because the kernel uses the entire
address as the name, including NULL characters.  ZMQ uses
NULL-terminated strings for the address, so the abstract address length
is the length of the string following the initial NULL byte plus 3; two
bytes for the address family and one for the initial NULL character.
2013-10-07 20:53:10 +02:00
Brandon Carpenter
0666152b21 Add note on Linux abstract namespace to ipc documentation. 2013-10-07 20:53:00 +02:00
Volodymyr Korniichuk
9293153f71 useless checks were removed 2013-10-05 09:22:56 +02:00
Pieter Hintjens
163aebbacf Merge pull request #17 from hintjens/master
Cherry picking changes from libzmq master
2013-10-05 00:20:49 -07:00
Brandon Carpenter
80d657a2c5 Add test for abstract namespace support in ipc sockets on Linux.
See issue 567.
2013-10-05 09:20:20 +02:00
Brandon Carpenter
668f000cb1 Add abstract namespace support for IPC sockets on Linux.
Converts an initial strudel or "at sign" (@) in the Unix socket path to
a NULL character ('\0') indicating that the socket uses the abstract
namespace instead of the filesystem namespace.  For instance, binding a
socket to 'ipc://@/tmp/tester' will not create a file associated with
the socket whereas binding to 'ipc:///tmp/tester' will create the file
/tmp/tester.  See issue 567 for more information.
2013-10-05 09:20:13 +02:00
KIU Shueng Chuan
a570b18931 WinSock2.h filename should be all lowercase 2013-10-05 09:20:06 +02:00
KIU Shueng Chuan
5493d4d180 test for _MSC_VER before using MSVC specific code 2013-10-05 09:19:58 +02:00
Pieter Hintjens
b20573c841 Merge pull request #16 from hintjens/master
Cherry picking changes from libzmq master
2013-10-04 00:19:57 -07:00
Martin Hurton
814b93e0cf Stop curve handshake when cookie box verification fails 2013-10-04 09:19:11 +02:00
Pieter Hintjens
d723b08c13 Updated libzmq CURVE to track RFC 27
* The INITIATE command vouch box is Box[C',S](C->S') instead of Box[C'](C->S),
  as recommended by https://codesinchaos.wordpress.com/2012/09/09/curvecp-1/,
  to reduce the risk of client impersonation.

* Mirrors the change in libcurve and CurveZMQ specifications.
2013-10-04 09:19:04 +02:00
Matt Connolly
62fd6fa861 git ignore test run output files: tests/test*.{log|trs} 2013-10-04 09:17:34 +02:00
Pieter Hintjens
a78ccf293f Disabled randomly failing part of test 2013-10-04 09:13:35 +02:00
Pieter Hintjens
d82ba6bd53 Clarified that zmq.h constants are part of the public contract 2013-10-04 09:13:17 +02:00
Dylan Cali
9da52ddf59 fix minor typo in zmq_ctx_get doc example 2013-10-04 09:13:11 +02:00
Pieter Hintjens
a69fa9ecde Packaging for zmq_curve_keypair function
* Added new man page for this
* Added test case, in tests/test_security_curve.cpp
* Noted in zmq_utils.h that these methods are documented
2013-10-04 09:13:05 +02:00
MinRK
facb96ffca use zmq_curve_keypair in curve_keygen 2013-10-04 09:12:56 +02:00
MinRK
3c469d04c1 return NULL and set EINVAL on bad z85 input
asserts aren't appropriate for checking user input.
2013-10-04 09:12:33 +02:00
MinRK
870233522c add zmq_curve_keypair to zmq_utils 2013-10-04 09:12:09 +02:00
Pieter Hintjens
34471cd591 Merge pull request #12 from hintjens/master
Backporting fixes from master
2013-09-29 08:04:10 -07:00
Pieter Hintjens
453ceb65b4 Fixed merge conflict error 2013-09-29 17:03:44 +02:00
Matt Connolly
4d82544c34 config for libsodium in alternate path, using CPP flags for both C and C++ sources. 2013-09-29 17:01:15 +02:00
MinRK
bd6bca7c82 include missing platform.hpp in curve_keygen
it was excluded, so HAVE_SODIUM would never be defined,
leading to curve_keygen always reporting "recompile with libsodium"
2013-09-29 17:01:00 +02:00
MinRK
087ddac593 declare z85_encode / decode in zmq_utils
they are used by curve_keygen, but not available
2013-09-29 17:00:22 +02:00
Matt Connolly
d1b686b644 Disable compiler warning for Solaris 2013-09-27 13:39:10 +02:00
Matt Connolly
4dc36c0dba Add '-lssp' linker flag for Solaris. Check for libraries after host specific setup. 2013-09-27 13:38:55 +02:00
Pieter Hintjens
17d12a6be1 Merge pull request #9 from hintjens/master
Cherry picking changes from libzmq master
2013-09-26 05:48:49 -07:00
Pieter Hintjens
89b97cbe59 Updated CMake project for test_issue_566 2013-09-26 14:46:49 +02:00
Pieter Hintjens
b0059211d5 Added test case for issue 566
* Tests dealer-to-router connection 100 times
* This was failing in ZMQ v4.0.0 RC1
2013-09-26 14:46:35 +02:00
Pieter Hintjens
cd452d5019 Merge pull request #8 from hintjens/master
Backporting fixes from master
2013-09-26 03:32:10 -07:00
Pieter Hintjens
dd185e13bf Bumped version for 4.0.1 2013-09-26 12:31:43 +02:00
Pieter Hintjens
517601de10 curve_keygen needed assert.h 2013-09-26 12:17:36 +02:00
Pieter Hintjens
5e25b32c36 Build/test errors on OS/X with clang++ 2013-09-26 12:17:28 +02:00
Martin Hurton
a5152245bd Call flush after writing the identity message 2013-09-26 10:07:29 +02:00
Pieter Hintjens
9cbcc49a4a Merge pull request #6 from ipechorin/master
stdint.h is not available on all platforms
2013-09-24 03:32:02 -07:00
Pieter Hintjens
2d5bad96e3 Merge pull request #7 from hintjens/master
Fixed build for MSVC 2008
2013-09-24 03:31:08 -07:00
Pieter Hintjens
c99b727f6d Fixed build for MSVC 2008 2013-09-24 12:29:29 +02:00
Ivan Pechorin
8a931a7554 stdint.h is not available on all platforms 2013-09-24 14:00:08 +04:00
Pieter Hintjens
b04df2c530 Merge pull request #4 from hintjens/master
Updated library ABI version
2013-09-24 00:12:03 -07:00
Pieter Hintjens
4df7cb043e Bumped library version and age 2013-09-24 09:11:17 +02:00
Pieter Hintjens
201454e866 Move away from port 8080 which is occupied on some boxes 2013-09-20 23:17:01 +02:00
Richard Newton
ad1bae2160 Merge pull request #3 from vortechs2000/fix_aix
Revert "Reference platform.h by ../src/platform.h - AIX is ignoring -I flags"
2013-09-20 12:48:29 -07:00
AJ Lewis
77f394a681 Revert "Reference platform.h by ../src/platform.h - AIX is ignoring -I flags"
This reverts commit 1e8e4d79c885b27831e2196d94987cc2817e0f04.
2013-09-20 14:39:01 -05:00
Pieter Hintjens
61d8bf9b11 Merge pull request #2 from vortechs2000/fix_aix
Reference platform.h by ../src/platform.h - AIX is ignoring -I flags
2013-09-20 09:54:03 -07:00
AJ Lewis
1e8e4d79c8 Reference platform.h by ../src/platform.h - AIX is ignoring -I flags 2013-09-20 11:43:53 -05:00
Pieter Hintjens
c0c8ce5508 File was missing from Makefile.am 2013-09-20 16:41:08 +02:00
Pieter Hintjens
66b1bc6d7f Added link to security tutorial 2013-09-20 16:15:53 +02:00
Pieter Hintjens
f5ecc826d9 Merge pull request #1 from hintjens/master
Preparation for 4.0.0 release
2013-09-20 07:06:48 -07:00
Pieter Hintjens
a31fe9565a Source distribution was broken 2013-09-20 16:03:14 +02:00
Pieter Hintjens
2f4905500d Version number is 4.0.0 2013-09-20 16:02:55 +02:00
Pieter Hintjens
c10a3ec526 Updated NEWS for 4.0.0 2013-09-20 15:53:27 +02:00
Pieter Hintjens
e0676a2b26 Renamed new socket options to be clearer
* ZMQ_REQ_STRICT was negative option (default 1) which goes against
  the standard, where defaults are zero. I renamed this to
  ZMQ_REQ_RELAXED.

* ZMQ_REQ_REQUEST_IDS felt clumsy and describes the technical solution
  rather than the problem/requirement. I changed to ZMQ_REQ_CORRELATE
  which seems more explicit.
2013-09-20 15:50:25 +02:00
Pieter Hintjens
b411a3561d Removed inaccurate note in zmq_poll man page 2013-09-20 15:35:20 +02:00
Pieter Hintjens
4ce9b42405 Built zmq_send_const man page properly 2013-09-20 15:35:11 +02:00
Pieter Hintjens
fef4fa8fc5 Renamed test_connect_delay to test_immediate
* The ZMQ_CONNECT_DELAY option was renamed to ZMQ_IMMEDIATE
2013-09-20 15:34:55 +02:00
Pieter Hintjens
4298f71cbf Updated NEWS for 3.2.x releases 2013-09-20 13:28:25 +02:00
81 changed files with 1298 additions and 454 deletions

21
.gitignore vendored
View File

@ -22,6 +22,7 @@ autom4te.cache
.*~ .*~
tools/curve_keygen.o tools/curve_keygen.o
tools/curve_keygen tools/curve_keygen
tests/test_issue_566
tests/test_ctx_destroy tests/test_ctx_destroy
tests/test_term_endpoint tests/test_term_endpoint
tests/test_system tests/test_system
@ -43,7 +44,7 @@ tests/test_invalid_rep
tests/test_msg_flags tests/test_msg_flags
tests/test_ts_context tests/test_ts_context
tests/test_connect_resolve tests/test_connect_resolve
tests/test_connect_delay tests/test_immediate
tests/test_term_endpoint tests/test_term_endpoint
tests/test_router_mandatory tests/test_router_mandatory
tests/test_disconnect_inproc tests/test_disconnect_inproc
@ -60,14 +61,28 @@ tests/test_spec_pushpull
tests/test_spec_rep tests/test_spec_rep
tests/test_spec_req tests/test_spec_req
tests/test_spec_router tests/test_spec_router
tests/test_req_request_ids tests/test_req_correlate
tests/test_req_strict tests/test_req_relaxed
tests/test_fork tests/test_fork
tests/test_conflate tests/test_conflate
tests/test_inproc_connect tests/test_inproc_connect
tests/test_linger tests/test_linger
tests/test_security_null tests/test_security_null
tests/test_security_plain tests/test_security_plain
tests/test_abstract_ipc
tests/test_connect_delay_tipc
tests/test_pair_tipc
tests/test_reqrep_device_tipc
tests/test_reqrep_tipc
tests/test_router_handover
tests/test_router_mandatory_tipc
tests/test_router_raw_empty
tests/test_shutdown_stress_tipc
tests/test_sub_forward_tipc
tests/test_term_endpoint_tipc
tests/test_many_sockets
tests/test*.log
tests/test*.trs
src/platform.hpp* src/platform.hpp*
src/stamp-h1 src/stamp-h1
perf/local_lat perf/local_lat

View File

@ -6,8 +6,11 @@ language: c
before_script: before_script:
# libsodium # libsodium
# Commit 8d0942 broke installation (sodium.h not found) so for now
# we're checking out the last good commit.
- git clone git://github.com/jedisct1/libsodium.git - git clone git://github.com/jedisct1/libsodium.git
- cd libsodium - cd libsodium
- git checkout e2a30a
- ./autogen.sh - ./autogen.sh
- ./configure && make check - ./configure && make check
- sudo make install - sudo make install
@ -15,4 +18,4 @@ before_script:
- cd .. - cd ..
# Build and check libzmq # Build and check libzmq
script: ./autogen.sh && ./configure && make && make check script: ./autogen.sh && ./configure && make V=1 && make check

View File

@ -20,6 +20,7 @@ Ben Gray <ben@benjamg.com>
Bernd Prager <bernd@prager.ws> Bernd Prager <bernd@prager.ws>
Bernd Melchers <melchers@ZEDAT.FU-Berlin.DE> Bernd Melchers <melchers@ZEDAT.FU-Berlin.DE>
Bob Beaty <rbeaty@peak6.com> Bob Beaty <rbeaty@peak6.com>
Brandon Carpenter <hashstat@yahoo.com>
Brian Buchanan <bwb@holo.org> Brian Buchanan <bwb@holo.org>
Brett Cameron <Brett.Cameron@hp.com> Brett Cameron <Brett.Cameron@hp.com>
Burak Arslan <burak-github@arskom.com.tr> Burak Arslan <burak-github@arskom.com.tr>

View File

@ -579,7 +579,7 @@ endif()
enable_testing() enable_testing()
set(tests set(tests
test_system test_system
test_connect_delay test_immediate
test_connect_resolve test_connect_resolve
test_ctx_destroy test_ctx_destroy
test_ctx_options test_ctx_options
@ -592,8 +592,8 @@ set(tests
test_pair_inproc test_pair_inproc
test_pair_tcp test_pair_tcp
test_probe_router test_probe_router
test_req_request_ids test_req_correlate
test_req_strict test_req_relaxed
test_reqrep_device test_reqrep_device
test_reqrep_inproc test_reqrep_inproc
test_reqrep_tcp test_reqrep_tcp
@ -610,7 +610,10 @@ set(tests
test_sub_forward test_sub_forward
test_term_endpoint test_term_endpoint
test_timeo test_timeo
test_inproc_connect) test_inproc_connect
test_issue_566
test_many_sockets
)
if(NOT WIN32) if(NOT WIN32)
list(APPEND tests list(APPEND tests
test_monitor test_monitor

203
NEWS
View File

@ -1,3 +1,204 @@
0MQ version 4.0.2 stable, released on 2013/11/24
================================================
Bug Fixes
---------
* Fixed LIBZMQ-583 - improved low-res timer for Windows
* Fixed LIBZMQ-578 - z85_decode was extremely slow
* Fixed LIBZMQ-577 - fault in man pages.
* Fixed LIBZMQ-574 - assertion failure when ran out of system file handles
* Fixed LIBZMQ-571 - test_stream failing in some cases
* Fixed LIBZMQ-569 - Socket server crashes with random client data and when
talking to 2.2 versions
* Fixed LIBZMQ-39 - Bad file descriptor during shutdown
* Pulled expected failing test_linger.cpp from release
* Reduced pause time in tests to allow "make check" to run faster
0MQ version 4.0.1 stable, released on 2013/10/08
================================================
Changes
-------
* Updated CURVE mechanism to track revised RFC 27 (INITIATE vouch).
The INITIATE command vouch box is Box[C',S](C->S') instead of
Box[C'](C->S), to reduce the risk of client impersonation, as per
https://codesinchaos.wordpress.com/2012/09/09/curvecp-1/.
* Fixed LIBZMQ-567, adding abstract namespaces for IPC sockets on Linux.
Converts an initial strudel or "at sign" (@) in the Unix socket path to
a NULL character ('\0') indicating that the socket uses the abstract
namespace instead of the filesystem namespace. For instance, binding a
socket to 'ipc://@/tmp/tester' will not create a file associated with
the socket whereas binding to 'ipc:///tmp/tester' will create the file
/tmp/tester. See issue 567 for more information.
* Added zmq_z85_encode and zmq_z85_decode to core libzmq API.
* Added zmq_curve_keypair to core libzmq API.
* Bumped library ABI version to 4:0:1.
Bug fixes
---------
* Fixed some build/test errors on OS/X + Clang++.
* Fixed LIBZMQ-565, typo in code.
* Fixed LIBZMQ-566, dealer-to-router connections sometimes failing.
* Fixed builds for AIX, MSVC 2008, OS/X with clang++, Solaris.
* Improved CURVE handshake error handling.
0MQ version 4.0.0 (RC1), released on 2013/09/20
===============================================
Major changes
-------------
* New wire level protocol, ZMTP/3.0, see http://rfc.zeromq.org/spec:23.
Does not yet implement the SUBSCRIBE, CANCEL, PING, and PONG commands.
* New security framework, from plain user+password to strong encryption,
see section below. See http://hintjens.com/blog:49 for a tutorial.
* New ZMQ_STREAM socket type for working as a TCP client or server. See:
tests/test_stream.cpp.
Improvements
------------
* You can now connect to an inproc:// endpoint that does not already
exist. This means inproc:// no longer needs careful set-up, but it may
break code that relied on the old behaviour. See:
tests/test_inproc_connect.cpp.
* Libzmq now checks socket types at connection time, so that trying to
connect a 'wrong' socket type will fail.
* New zmq_ctx_shutdown API method will shutdown a context and send ETERM
to blocking calls, without blocking. Use zmq_ctx_term to finalise the
process.
* The regression test suite has been significantly extended and improved.
* Contexts can now be terminated in forked child processes. See:
tests/test_fork.cpp.
* zmq_disconnect now respects the linger setting on sockets.
* New zmq_send_const API method to send constant data (without copying).
See: tests/test_inproc_connect.cpp.
* Added CMake support for static libraries.
* Added test cases for socket semantics as defined in RFCs 28, 29, 30, 31.
See: tests/test_spec_*.cpp.
* New socket option, ZMQ_PROBE_ROUTER triggers an empty message on connect.
See: tests/test_probe_router.cpp.
* New socket option, ZMQ_REQ_CORRELATE allows for correlation of replies
from a REP socket. See: tests/test_req_correlate.cpp.
* New socket option, ZMQ_REQ_RELAXED, lets you disable the state machine
on a REQ socket, so you can send multiple requests without waiting for
replies, and without getting an EFSM error. See:
tests/test_req_relaxed.cpp.
* New socket option, ZMQ_CONFLATE restricts the outgoing and incoming
socket buffers to a single message. See: tests/test_conflate.cpp.
Deprecated Options
------------------
* ZMQ_IPV4ONLY deprecated and renamed to ZMQ_IPV6 so that options are
consistently "off" by default.
* ZMQ_DELAY_ATTACH_ON_CONNECT deprecated, and renamed to ZMQ_IMMEDIATE.
See: tests/test_immediate.cpp.
Security Framework
------------------
Based on new ZMTP wire level protocol that negotiates a security
"mechanism" between client and server before exchanging any other data.
Security mechanisms are extensible. ZMTP defines three by default:
* NULL - classic ZeroMQ, with no authentication. See
http://rfc.zeromq.org/spec:23.
* PLAIN - plain-text username + password authentication. See
http://rfc.zeromq.org/spec:24.
* CURVE - secure authentication and encryption based on elliptic curve
cryptography, using the Curve25519 algorithm from Daniel Bernstein and
based on CurveCP's security handshake. See http://rfc.zeromq.org/spec:25,
http://rfc.zeromq.org/spec:26, and http://curvecp.org.
Authentication is done by pluggable "authenticators" that connect to libzmq
over an inproc endpoint, see http://rfc.zeromq.org/spec:27.
Socket options to configure PLAIN security on client or server:
* ZMQ_PLAIN_SERVER, ZMQ_PLAIN_USERNAME, ZMQ_PLAIN_PASSWORD. See
tests/test_security_plain.
Socket options to configure CURVE security on client or server:
* ZMQ_CURVE_SERVER, ZMQ_CURVE_PUBLICKEY, ZMQ_CURVE_SECRETKEY,
ZMQ_CURVE_SERVERKEY. See tests/test_security_curve.cpp.
Socket options to configure "domain" for ZAP handler:
* ZMQ_ZAP_DOMAIN, see tests/test_security_null.cpp.
Support for encoding/decoding CURVE binary keys to ASCII:
* zmq_z85_encode, zmq_z85_decode.
Other issues addressed in this release
--------------------------------------
* LIBZMQ-525 Multipart upstreaming from XSUB to XPUB
0MQ version 3.2.4 stable, released on 2013/09/20
================================================
* LIBZMQ-84 (Windows) Assertion failed: Address already in use at signaler.cpp:80
* LIBZMQ-456 ZMQ_XPUB_VERBOSE does not propagate in a tree of XPUB/XSUB devices
* LIBZMQ-532 (Windows) critical section not released on error
* LIBZMQ-569 Detect OpenPGM 5.2 system library
* LIBZMQ-563 Subscribers sometimes stopped receiving messages (aka LIBZMQ-541)
* LIBZMQ-XXX Added support for Travis Continuous Integration
* LIBZMQ-XXX Several improvements to MSVC support
0MQ version 3.2.3 stable, released on 2013/05/02
================================================
Issues addressed in this release
--------------------------------
* LIBZMQ-526 Assertion failure "Invalid argument (tcp_connecter.cpp:285)"
* LIBZMQ-446 Setting the DSCP bits by default causes CAP_NET_ADMIN error
* LIBZMQ-496 Crash on heavy socket opening/closing: Device or resource busy (mutex.hpp:90)
* LIBZMQ-462 test_connect_delay fails at test_connect_delay.cpp:80
* LIBZMQ-497 Messages getting dropped
* LIBZMQ-488 signaler.cpp leaks the win32 Event Handle
* LIBZMQ-476 zmq_disconnect has no effect for inproc sockets
* LIBZMQ-475 zmq_disconnect does not sent unsubscribe messages
0MQ version 3.2.2 stable, released on 2012/11/23 0MQ version 3.2.2 stable, released on 2012/11/23
================================================ ================================================
@ -13,7 +214,6 @@ Issues addressed in this release
* LIBZMQ-450 lt-test_monitor: fails with assertion at test_monitor.cpp:81 * LIBZMQ-450 lt-test_monitor: fails with assertion at test_monitor.cpp:81
* LIBZMQ-451 ZMQ_ROUTER_MANDATORY blocks forever * LIBZMQ-451 ZMQ_ROUTER_MANDATORY blocks forever
* LIBZMQ-452 test_connect_delay.cpp:175:12: error: 'sleep' was not declared in this scope * LIBZMQ-452 test_connect_delay.cpp:175:12: error: 'sleep' was not declared in this scope
* LIBZMQ-456 ZMQ_XPUB_VERBOSE does not propagate in a tree of XPUB/XSUB devices
* LIBZMQ-458 lt-test_router_mandatory fails with assertion at test_router_mandatory.cpp:53 * LIBZMQ-458 lt-test_router_mandatory fails with assertion at test_router_mandatory.cpp:53
* LIBZMQ-459 Assertion failed: encoder (stream_engine.cpp:266 * LIBZMQ-459 Assertion failed: encoder (stream_engine.cpp:266
* LIBZMQ-464 PUB socket with HWM set leaks memory * LIBZMQ-464 PUB socket with HWM set leaks memory
@ -21,6 +221,7 @@ Issues addressed in this release
* LIBZMQ-468 ZMQ_XPUB_VERBOSE & unsubscribe * LIBZMQ-468 ZMQ_XPUB_VERBOSE & unsubscribe
* LIBZMQ-472 Segfault in zmq_poll in REQ to ROUTER dialog * LIBZMQ-472 Segfault in zmq_poll in REQ to ROUTER dialog
0MQ version 3.2.1 (RC2), released on 2012/10/15 0MQ version 3.2.1 (RC2), released on 2012/10/15
=============================================== ===============================================

View File

@ -29,9 +29,10 @@ AC_SUBST(PACKAGE_VERSION)
# ZeroMQ versions 2.1.x: 1:0:0 (ABI version 1) # ZeroMQ versions 2.1.x: 1:0:0 (ABI version 1)
# ZeroMQ version 3.0: 2:0:0 (ABI version 2) # ZeroMQ version 3.0: 2:0:0 (ABI version 2)
# ZeroMQ version 3.1: 3:0:0 (ABI version 3) # ZeroMQ version 3.1: 3:0:0 (ABI version 3)
# ZeroMQ version 4.0: 4:0:1 (ABI version 4)
# #
# libzmq -version-info current:revision:age # libzmq -version-info current:revision:age
LTVER="3:0:0" LTVER="4:0:1"
AC_SUBST(LTVER) AC_SUBST(LTVER)
# Take a copy of original flags # Take a copy of original flags
@ -62,10 +63,44 @@ LIBZMQ_CHECK_ENABLE_DEBUG
# Check wheter to enable code coverage # Check wheter to enable code coverage
LIBZMQ_WITH_GCOV LIBZMQ_WITH_GCOV
# Checks for libraries # Allow libsodium to be installed in a custom path:
AC_CHECK_LIB([pthread], [pthread_create])
AC_CHECK_LIB([rt], [clock_gettime]) AC_ARG_WITH([libsodium],
AC_CHECK_LIB([sodium], [sodium_init],,AC_MSG_WARN(libsodium is needed for CURVE security)) [AS_HELP_STRING([--with-libsodium],
[Specify libsodium prefix])],
[zmq_search_libsodium="yes"],
[])
if test "x$zmq_search_libsodium" = "xyes"; then
if test -r "${with_libsodium}/include/sodium.h"; then
CPPFLAGS="-I${with_libsodium}/include ${CPPFLAGS}"
LDFLAGS="-L${with_libsodium}/lib ${LDFLAGS}"
fi
fi
AC_ARG_WITH([libsodium-include-dir],
[AS_HELP_STRING([--with-libsodium-include-dir],
[Specify libsodium include prefix])],
[zmq_search_libsodium_include="yes"],
[])
if test "x$zmq_search_libsodium_include" = "xyes"; then
if test -r "${with_libsodium_include_dir}/sodium.h"; then
CPPFLAGS="-I${with_libsodium_include_dir}/include ${CPPFLAGS}"
fi
fi
AC_ARG_WITH([libsodium_lib_dir],
[AS_HELP_STRING([--with-libsodium-lib-dir],
[Specify libsodium library prefix])],
[zmq_search_libsodium_lib="yes"],
[])
if test "x$zmq_search_libsodium_lib" = "xyes"; then
if test -r "${with_libsodium_lib_dir}/libsodium.{a|so|dylib}"; then
LDFLAGS="-L${with_libsodium}/lib ${LDFLAGS}"
fi
fi
# Set pedantic # Set pedantic
libzmq_pedantic="yes" libzmq_pedantic="yes"
@ -79,6 +114,7 @@ libzmq_dso_visibility="yes"
# Platform specific checks # Platform specific checks
libzmq_on_mingw32="no" libzmq_on_mingw32="no"
libzmq_on_android="no" libzmq_on_android="no"
libzmq_on_linux="no"
# Set some default features required by 0MQ code. # Set some default features required by 0MQ code.
CPPFLAGS="-D_REENTRANT -D_THREAD_SAFE $CPPFLAGS" CPPFLAGS="-D_REENTRANT -D_THREAD_SAFE $CPPFLAGS"
@ -94,6 +130,7 @@ case "${host_os}" in
CPPFLAGS="-D_GNU_SOURCE $CPPFLAGS" CPPFLAGS="-D_GNU_SOURCE $CPPFLAGS"
fi fi
AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS]) AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS])
libzmq_on_linux="yes"
case "${host_os}" in case "${host_os}" in
*android*) *android*)
@ -121,6 +158,9 @@ case "${host_os}" in
if test "x$solaris_has_atomic" = "xno"; then if test "x$solaris_has_atomic" = "xno"; then
AC_DEFINE(ZMQ_FORCE_MUTEXES, 1, [Force to use mutexes]) AC_DEFINE(ZMQ_FORCE_MUTEXES, 1, [Force to use mutexes])
fi fi
# ssp library is required for libsodium on Solaris-like systems
LDFLAGS="-lssp $LDFLAGS"
CPPFLAGS="$CPPFLAGS -Wno-long-long"
;; ;;
*freebsd*) *freebsd*)
# Define on FreeBSD to enable all library features # Define on FreeBSD to enable all library features
@ -214,6 +254,11 @@ case "${host_os}" in
;; ;;
esac esac
# Checks for libraries
AC_CHECK_LIB([pthread], [pthread_create])
AC_CHECK_LIB([rt], [clock_gettime])
AC_CHECK_LIB([sodium], [sodium_init],,AC_MSG_WARN(libsodium is needed for CURVE security))
# #
# Check if the compiler supports -fvisibility=hidden flag. MinGW32 uses __declspec # Check if the compiler supports -fvisibility=hidden flag. MinGW32 uses __declspec
# #
@ -385,6 +430,7 @@ AC_LANG_POP([C++])
AM_CONDITIONAL(BUILD_PGM, test "x$libzmq_pgm_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM, test "x$libzmq_pgm_ext" = "xyes")
AM_CONDITIONAL(ON_MINGW, test "x$libzmq_on_mingw32" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$libzmq_on_mingw32" = "xyes")
AM_CONDITIONAL(ON_ANDROID, test "x$libzmq_on_android" = "xyes") AM_CONDITIONAL(ON_ANDROID, test "x$libzmq_on_android" = "xyes")
AM_CONDITIONAL(ON_LINUX, test "x$libzmq_on_linux" = "xyes")
# Checks for library functions. # Checks for library functions.
AC_TYPE_SIGNAL AC_TYPE_SIGNAL

View File

@ -3,13 +3,13 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \
zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \ zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \
zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \ zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \
zmq_msg_send.3 zmq_msg_recv.3 \ zmq_msg_send.3 zmq_msg_recv.3 \
zmq_send.3 zmq_recv.3 \ zmq_send.3 zmq_recv.3 zmq_send_const.3 \
zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 \ zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 \
zmq_getsockopt.3 zmq_setsockopt.3 \ zmq_getsockopt.3 zmq_setsockopt.3 \
zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \ zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \
zmq_errno.3 zmq_strerror.3 zmq_version.3 zmq_proxy.3 \ zmq_errno.3 zmq_strerror.3 zmq_version.3 zmq_proxy.3 \
zmq_sendmsg.3 zmq_recvmsg.3 zmq_init.3 zmq_term.3 \ zmq_sendmsg.3 zmq_recvmsg.3 zmq_init.3 zmq_term.3 \
zmq_z85_encode.3 zmq_z85_decode.3 zmq_z85_encode.3 zmq_z85_decode.3 zmq_curve_keypair.3
MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 \ MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 \
zmq_null.7 zmq_plain.7 zmq_curve.7 zmq_null.7 zmq_plain.7 zmq_curve.7

View File

@ -44,9 +44,6 @@ Work with context properties::
Destroy a 0MQ context:: Destroy a 0MQ context::
linkzmq:zmq_ctx_term[3] linkzmq:zmq_ctx_term[3]
Monitor a 0MQ context::
linkzmq:zmq_ctx_set_monitor[3]
These deprecated functions let you create and destroy 'contexts': These deprecated functions let you create and destroy 'contexts':
Initialise 0MQ context:: Initialise 0MQ context::
@ -140,6 +137,10 @@ Sending and receiving messages::
linkzmq:zmq_msg_recv[3] linkzmq:zmq_msg_recv[3]
linkzmq:zmq_send[3] linkzmq:zmq_send[3]
linkzmq:zmq_recv[3] linkzmq:zmq_recv[3]
linkzmq:zmq_send_const[3]
Monitoring socket events:
linkzmq:zmq_socket_monitor[3]
.Input/output multiplexing .Input/output multiplexing
0MQ provides a mechanism for applications to multiplex input/output events over 0MQ provides a mechanism for applications to multiplex input/output events over
@ -193,6 +194,15 @@ Plain-text authentication using username and password::
Elliptic curve authentication and encryption:: Elliptic curve authentication and encryption::
linkzmq:zmq_curve[7] linkzmq:zmq_curve[7]
Generate a CURVE keypair in armored text format:
linkzmq:zmq_curve_keypair[3]
Convert an armored key into a 32-byte binary key:
linkzmq:zmq_z85_decode[3]
Convert a 32-byte binary CURVE key to an armored text string:
linkzmq:zmq_z85_encode[3]
ERROR HANDLING ERROR HANDLING
-------------- --------------

View File

@ -54,7 +54,7 @@ EXAMPLE
.Setting a limit on the number of sockets .Setting a limit on the number of sockets
---- ----
void *context = zmq_ctx_new (); void *context = zmq_ctx_new ();
zmq_ctx_get (context, ZMQ_MAX_SOCKETS, 256); zmq_ctx_set (context, ZMQ_MAX_SOCKETS, 256);
int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS); int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS);
assert (max_sockets == 256); assert (max_sockets == 256);
---- ----

56
doc/zmq_curve_keypair.txt Normal file
View File

@ -0,0 +1,56 @@
zmq_curve_keypair(3)
====================
NAME
----
zmq_curve_keypair - generate a new CURVE keypair
SYNOPSIS
--------
*int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key);*
DESCRIPTION
-----------
The _zmq_curve_keypair()_ function shall return a newly generated random
keypair consisting of a public key and a secret key. The caller provides
two buffers, each at least 41 octets large, in which this method will
store the keys. The keys are encoded using linkzmq:zmq_z85_encode[3].
RETURN VALUE
------------
The _zmq_curve_keypair()_ function shall return 0 if successful, else it
shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*ENOTSUP*::
The libzmq library was not built with cryptographic support (libsodium).
EXAMPLE
-------
.Generating a new CURVE keypair
----
char public_key [41];
char secret_key [41];
int rc = crypto_box_keypair (public_key, secret_key);
assert (rc == 0);
----
SEE ALSO
--------
linkzmq:zmq_z85_decode[3]
linkzmq:zmq_z85_encode[3]
linkzmq:zmq_curve[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.

View File

@ -120,8 +120,8 @@ Default value:: 0
Applicable socket types:: N/A Applicable socket types:: N/A
ZMQ_IDENTITY: Set socket identity ZMQ_IDENTITY: Retrieve socket identity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified 'socket'. The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified 'socket'.
Socket identity is used only by request/reply pattern. Namely, it can be used Socket identity is used only by request/reply pattern. Namely, it can be used
in tandem with ROUTER socket to route messages to the peer with specific in tandem with ROUTER socket to route messages to the peer with specific
@ -134,7 +134,7 @@ starting with binary zero are reserved for use by 0MQ infrastructure.
Option value type:: binary data Option value type:: binary data
Option value unit:: N/A Option value unit:: N/A
Default value:: NULL Default value:: NULL
Applicable socket types:: all Applicable socket types:: ZMQ_REP, ZMQ_REQ, ZMQ_ROUTER, ZMQ_DEALER.
ZMQ_RATE: Retrieve multicast data rate ZMQ_RATE: Retrieve multicast data rate
@ -352,8 +352,8 @@ Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports. Applicable socket types:: all, when using TCP transports.
ZMQ_DELAY_ATTACH_ON_CONNECT: Retrieve attach-on-connect value ZMQ_IMMEDIATE: Retrieve attach-on-connect value
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the state of the attach on connect value. If set to `1`, will delay the Retrieve the state of the attach on connect value. If set to `1`, will delay the
attachment of a pipe on connect until the underlying connection has completed. attachment of a pipe on connect until the underlying connection has completed.
This will cause the socket to block if there are no other connections, but will This will cause the socket to block if there are no other connections, but will

View File

@ -48,6 +48,11 @@ NOTE: the endpoint pathname must be writable by the process. When the endpoint
starts with '/', e.g., `ipc:///pathname`, this will be an _absolute_ pathname. starts with '/', e.g., `ipc:///pathname`, this will be an _absolute_ pathname.
If the endpoint specifies a directory that does not exist, the bind shall fail. If the endpoint specifies a directory that does not exist, the bind shall fail.
NOTE: on Linux only, when the endpoint pathname starts with `@`, the abstract
namespace shall be used. The abstract namespace is independent of the
filesystem and if a process attempts to bind an endpoint already bound by a
process, it will fail. See unix(7) for details.
Connecting a socket Connecting a socket
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
When connecting a 'socket' to a peer address using _zmq_connect()_ with the When connecting a 'socket' to a peer address using _zmq_connect()_ with the

View File

@ -40,6 +40,7 @@ EXAMPLE
------- -------
.Receiving a multi-frame message .Receiving a multi-frame message
---- ----
zmq_msg_t frame;
while (true) { while (true) {
// Create an empty 0MQ message to hold the message frame // Create an empty 0MQ message to hold the message frame
int rc = zmq_msg_init (&frame); int rc = zmq_msg_init (&frame);
@ -53,7 +54,7 @@ while (true) {
fprintf (stderr, "end\n"); fprintf (stderr, "end\n");
break; break;
} }
zmq_msg_close (frame); zmq_msg_close (&frame);
} }
---- ----

View File

@ -45,7 +45,7 @@ while (true) {
fprintf (stderr, "end\n"); fprintf (stderr, "end\n");
break; break;
} }
zmq_msg_close (part); zmq_msg_close (&part);
} }
---- ----

View File

@ -73,10 +73,6 @@ NOTE: The _zmq_poll()_ function may be implemented or emulated using operating
system interfaces other than _poll()_, and as such may be subject to the limits system interfaces other than _poll()_, and as such may be subject to the limits
of those interfaces in ways not defined in this documentation. of those interfaces in ways not defined in this documentation.
NOTE: The _zmq_send()_ function will clear all pending events on a socket. Thus,
if you use _zmq_poll()_ to monitor input on a socket, use it before output as
well, and process all events after each _zmq_poll()_ call.
RETURN VALUE RETURN VALUE
------------ ------------
Upon successful completion, the _zmq_poll()_ function shall return the number Upon successful completion, the _zmq_poll()_ function shall return the number

View File

@ -1,5 +1,5 @@
zmq_send_const(3) zmq_send_const(3)
=========== =================
NAME NAME

View File

@ -14,7 +14,7 @@ SYNOPSIS
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE, ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE,
ZMQ_REQ_STRICT, ZMQ_REQ_REQUEST_IDS only take effect for subsequent socket ZMQ_REQ_CORRELATE, and ZMQ_REQ_RELAXED, only take effect for subsequent socket
bind/connects. bind/connects.
Specifically, security options take effect for subsequent bind/connect calls, Specifically, security options take effect for subsequent bind/connect calls,
@ -150,7 +150,7 @@ results shall be undefined.
Option value type:: binary data Option value type:: binary data
Option value unit:: N/A Option value unit:: N/A
Default value:: NULL Default value:: NULL
Applicable socket types:: all Applicable socket types:: ZMQ_REQ, ZMQ_REP, ZMQ_ROUTER, ZMQ_DEALER.
ZMQ_RATE: Set multicast data rate ZMQ_RATE: Set multicast data rate
@ -461,11 +461,11 @@ Default value:: 0
Applicable socket types:: ZMQ_XPUB Applicable socket types:: ZMQ_XPUB
ZMQ_REQ_REQUEST_IDS: enable extra request identity frames ZMQ_REQ_CORRELATE: match replies with requests
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The default behavior of REQ sockets is to rely on the ordering of messages The default behavior of REQ sockets is to rely on the ordering of messages to
to match requests and responses and that is usually sufficient. When this option match requests and responses and that is usually sufficient. When this option
is set to 1, the REQ socket will prefix outgoing messages with an extra frame is set to 1, the REQ socket will prefix outgoing messages with an extra frame
containing a request id. That means the full message is (request id, 0, containing a request id. That means the full message is (request id, 0,
user frames...). The REQ socket will discard all incoming messages that don't user frames...). The REQ socket will discard all incoming messages that don't
@ -478,25 +478,25 @@ Default value:: 0
Applicable socket types:: ZMQ_REQ Applicable socket types:: ZMQ_REQ
ZMQ_REQ_STRICT: enforce strict alternation between request and reply ZMQ_REQ_RELAXED: relax strict alternation between request and reply
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When set to 1, a REQ socket does not allow initiating a new request with By default, a REQ socket does not allow initiating a new request with
_zmq_send(3)_ until the reply to the previous one has been received. _zmq_send(3)_ until the reply to the previous one has been received.
When set to 0, sending another message is allowed and has the effect of When set to 1, sending another message is allowed and has the effect of
disconnecting the underlying connection to the peer from which the reply was disconnecting the underlying connection to the peer from which the reply was
expected, triggering a reconnection attempt on transports that support it. expected, triggering a reconnection attempt on transports that support it.
The request-reply state machine is reset and a new request is sent to the The request-reply state machine is reset and a new request is sent to the
next available peer. next available peer.
If set to 0, also enable ZMQ_REQ_REQUEST_IDS to ensure correct If set to 1, also enable ZMQ_REQ_CORRELATE to ensure correct matching of
matching of requests and replies. Otherwise a late reply to an aborted request requests and replies. Otherwise a late reply to an aborted request can be
can be reported as the reply to the superseding request. reported as the reply to the superseding request.
[horizontal] [horizontal]
Option value type:: int Option value type:: int
Option value unit:: 0, 1 Option value unit:: 0, 1
Default value:: 1 Default value:: 0
Applicable socket types:: ZMQ_REQ Applicable socket types:: ZMQ_REQ
@ -624,7 +624,9 @@ linkzmq:zmq_curve[7]. A value of '1' means the socket will act as
CURVE server. A value of '0' means the socket will not act as CURVE CURVE server. A value of '0' means the socket will not act as CURVE
server, and its security role then depends on other option settings. server, and its security role then depends on other option settings.
Setting this to '0' shall reset the socket security to NULL. When you Setting this to '0' shall reset the socket security to NULL. When you
set this you must also set the ZMQ_CURVE_PUBLICKEY option. set this you must also set the server's secret key using the
ZMQ_CURVE_SECRETKEY option. A server socket does not need to know
its own public key.
[horizontal] [horizontal]
Option value type:: int Option value type:: int
@ -636,14 +638,11 @@ Applicable socket types:: all, when using TCP transport
ZMQ_CURVE_PUBLICKEY: Set CURVE public key ZMQ_CURVE_PUBLICKEY: Set CURVE public key
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the socket's long term public key. You must set this on a CURVE Sets the socket's long term public key. You must set this on CURVE client
client or server socket, see linkzmq:zmq_curve[7]. You can provide the sockets, see linkzmq:zmq_curve[7]. You can provide the key as 32 binary
key as 32 binary bytes, or as a 40-character string encoded in the Z85 bytes, or as a 40-character string encoded in the Z85 encoding format.
encoding format. For servers, the public key must be persisted and The public key must always be used with the matching secret key. To
shared through some unspecified but secure mechanism to clients. The generate a public/secret key pair, use linkzmq:zmq_curve_keypair[3].
public key must always be used with the matching secret key generated
at the same time. To generate a public/secret key pair, use the
tools/curve_keygen tool.
[horizontal] [horizontal]
Option value type:: binary data or Z85 text string Option value type:: binary data or Z85 text string
@ -655,10 +654,11 @@ Applicable socket types:: all, when using TCP transport
ZMQ_CURVE_SECRETKEY: Set CURVE secret key ZMQ_CURVE_SECRETKEY: Set CURVE secret key
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the socket's long term secret key. You must set this on a CURVE Sets the socket's long term secret key. You must set this on both CURVE
client socket, see linkzmq:zmq_curve[7]. You can provide the key as client and server sockets, see linkzmq:zmq_curve[7]. You can provide the
32 binary bytes, or as a 40-character string encoded in the Z85 encoding key as 32 binary bytes, or as a 40-character string encoded in the Z85
format. encoding format. To generate a public/secret key pair, use
linkzmq:zmq_curve_keypair[3].
[horizontal] [horizontal]
Option value type:: binary data or Z85 text string Option value type:: binary data or Z85 text string
@ -670,11 +670,10 @@ Applicable socket types:: all, when using TCP transport
ZMQ_CURVE_SERVERKEY: Set CURVE server key ZMQ_CURVE_SERVERKEY: Set CURVE server key
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the socket's long term server key. You must set this on a CURVE Sets the socket's long term server key. You must set this on CURVE client
client socket, see linkzmq:zmq_curve[7]. You can provide the key as sockets, see linkzmq:zmq_curve[7]. You can provide the key as 32 binary
32 binary bytes, or as a 40-character string encoded in the Z85 encoding bytes, or as a 40-character string encoded in the Z85 encoding format.
format. This key must be the same as the public key set on the server This key must have been generated together with the server's secret key.
socket.
[horizontal] [horizontal]
Option value type:: binary data or Z85 text string Option value type:: binary data or Z85 text string

View File

@ -62,6 +62,7 @@ The request-reply pattern is used for sending requests from a ZMQ_REQ _client_
to one or more ZMQ_REP _services_, and receiving subsequent replies to each to one or more ZMQ_REP _services_, and receiving subsequent replies to each
request sent. request sent.
The request-reply pattern is formally defined by http://rfc.zeromq.org/spec:28.
ZMQ_REQ ZMQ_REQ
^^^^^^^ ^^^^^^^
@ -168,6 +169,7 @@ Publish-subscribe pattern
The publish-subscribe pattern is used for one-to-many distribution of data from The publish-subscribe pattern is used for one-to-many distribution of data from
a single _publisher_ to multiple _subscribers_ in a fan out fashion. a single _publisher_ to multiple _subscribers_ in a fan out fashion.
The publish-subscribe pattern is formally defined by http://rfc.zeromq.org/spec:29.
ZMQ_PUB ZMQ_PUB
^^^^^^^ ^^^^^^^
@ -249,6 +251,7 @@ a pipeline. Data always flows down the pipeline, and each stage of the pipeline
is connected to at least one _node_. When a pipeline stage is connected to is connected to at least one _node_. When a pipeline stage is connected to
multiple _nodes_ data is round-robined among all connected _nodes_. multiple _nodes_ data is round-robined among all connected _nodes_.
The pipeline pattern is formally defined by http://rfc.zeromq.org/spec:30.
ZMQ_PUSH ZMQ_PUSH
^^^^^^^^ ^^^^^^^^
@ -296,6 +299,7 @@ The exclusive pair pattern is used to connect a peer to precisely one other
peer. This pattern is used for inter-thread communication across the inproc peer. This pattern is used for inter-thread communication across the inproc
transport. transport.
The exclusive pair pattern is formally defined by http://rfc.zeromq.org/spec:31.
ZMQ_PAIR ZMQ_PAIR
^^^^^^^^ ^^^^^^^^

View File

@ -31,7 +31,6 @@ EXAMPLE
------- -------
.Decoding a CURVE key .Decoding a CURVE key
---- ----
#include <sodium.h>
char decoded [] = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7"; char decoded [] = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
uint8_t public_key [32]; uint8_t public_key [32];
zmq_z85_decode (public_key, decoded); zmq_z85_decode (public_key, decoded);
@ -41,6 +40,7 @@ zmq_z85_decode (public_key, decoded);
SEE ALSO SEE ALSO
-------- --------
linkzmq:zmq_z85_decode[3] linkzmq:zmq_z85_decode[3]
linkzmq:zmq_curve_keypair[3]
linkzmq:zmq_curve[7] linkzmq:zmq_curve[7]

View File

@ -47,6 +47,7 @@ puts (encoded);
SEE ALSO SEE ALSO
-------- --------
linkzmq:zmq_z85_decode[3] linkzmq:zmq_z85_decode[3]
linkzmq:zmq_curve_keypair[3]
linkzmq:zmq_curve[7] linkzmq:zmq_curve[7]

View File

@ -15,11 +15,29 @@
You should have received a copy of the GNU Lesser General Public License You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*************************************************************************
NOTE to contributors. This file comprises the principal public contract
for ZeroMQ API users (along with zmq_utils.h). Any change to this file
supplied in a stable release SHOULD not break existing applications.
In practice this means that the value of constants must not change, and
that old values may not be reused for new constants.
*************************************************************************
*/ */
#ifndef __ZMQ_H_INCLUDED__ #ifndef __ZMQ_H_INCLUDED__
#define __ZMQ_H_INCLUDED__ #define __ZMQ_H_INCLUDED__
/* Version macros for compile-time API version detection */
#define ZMQ_VERSION_MAJOR 4
#define ZMQ_VERSION_MINOR 0
#define ZMQ_VERSION_PATCH 2
#define ZMQ_MAKE_VERSION(major, minor, patch) \
((major) * 10000 + (minor) * 100 + (patch))
#define ZMQ_VERSION \
ZMQ_MAKE_VERSION(ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH)
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -62,28 +80,14 @@ typedef __int32 int32_t;
# ifndef uint16_t # ifndef uint16_t
typedef unsigned __int16 uint16_t; typedef unsigned __int16 uint16_t;
# endif # endif
# ifndef uint8_t
typedef unsigned __int8 uint8_t;
# endif
#else #else
# include <stdint.h> # include <stdint.h>
#endif #endif
/******************************************************************************/
/* 0MQ versioning support. */
/******************************************************************************/
/* Version macros for compile-time API version detection */
#define ZMQ_VERSION_MAJOR 3
#define ZMQ_VERSION_MINOR 3
#define ZMQ_VERSION_PATCH 0
#define ZMQ_MAKE_VERSION(major, minor, patch) \
((major) * 10000 + (minor) * 100 + (patch))
#define ZMQ_VERSION \
ZMQ_MAKE_VERSION(ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH)
/* Run-time API version detection */
ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
/******************************************************************************/ /******************************************************************************/
/* 0MQ errors. */ /* 0MQ errors. */
/******************************************************************************/ /******************************************************************************/
@ -154,6 +158,9 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#define ETERM (ZMQ_HAUSNUMERO + 53) #define ETERM (ZMQ_HAUSNUMERO + 53)
#define EMTHREAD (ZMQ_HAUSNUMERO + 54) #define EMTHREAD (ZMQ_HAUSNUMERO + 54)
/* Run-time API version detection */
ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
/* This function retrieves the errno as it is known to 0MQ library. The goal */ /* This function retrieves the errno as it is known to 0MQ library. The goal */
/* of this function is to make the code 100% portable, including where 0MQ */ /* of this function is to make the code 100% portable, including where 0MQ */
/* compiled with certain CRT library (on Windows) is linked to an */ /* compiled with certain CRT library (on Windows) is linked to an */
@ -174,7 +181,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
/* Default for new contexts */ /* Default for new contexts */
#define ZMQ_IO_THREADS_DFLT 1 #define ZMQ_IO_THREADS_DFLT 1
#define ZMQ_MAX_SOCKETS_DFLT 1024 #define ZMQ_MAX_SOCKETS_DFLT 1023
ZMQ_EXPORT void *zmq_ctx_new (void); ZMQ_EXPORT void *zmq_ctx_new (void);
ZMQ_EXPORT int zmq_ctx_term (void *context); ZMQ_EXPORT int zmq_ctx_term (void *context);
@ -277,8 +284,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_CURVE_SECRETKEY 49 #define ZMQ_CURVE_SECRETKEY 49
#define ZMQ_CURVE_SERVERKEY 50 #define ZMQ_CURVE_SERVERKEY 50
#define ZMQ_PROBE_ROUTER 51 #define ZMQ_PROBE_ROUTER 51
#define ZMQ_REQ_REQUEST_IDS 52 #define ZMQ_REQ_CORRELATE 52
#define ZMQ_REQ_STRICT 53 #define ZMQ_REQ_RELAXED 53
#define ZMQ_CONFLATE 54 #define ZMQ_CONFLATE 54
#define ZMQ_ZAP_DOMAIN 55 #define ZMQ_ZAP_DOMAIN 55

View File

@ -22,9 +22,22 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
/* Define integer types needed for event interface */
#if defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_OPENVMS
# include <inttypes.h>
#elif defined _MSC_VER && _MSC_VER < 1600
# ifndef int32_t
typedef __int32 int32_t;
# endif
# ifndef uint16_t
typedef unsigned __int16 uint16_t;
# endif
#else
# include <stdint.h>
#endif
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -48,8 +61,22 @@ extern "C" {
# endif # endif
#endif #endif
/* These functions are documented by man pages */
/* Encode data with Z85 encoding. Returns encoded data */
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);
/* Decode data with Z85 encoding. Returns decoded data */
ZMQ_EXPORT uint8_t *zmq_z85_decode (uint8_t *dest, char *string);
/* Generate z85-encoded public and private keypair with libsodium. */
/* Returns 0 on success. */
ZMQ_EXPORT int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key);
typedef void (zmq_thread_fn) (void*); typedef void (zmq_thread_fn) (void*);
/* These functions are not documented by man pages */
/* Helper functions are used by perf tests so that they don't have to care */ /* Helper functions are used by perf tests so that they don't have to care */
/* about minutiae of time-related functions on different OS platforms. */ /* about minutiae of time-related functions on different OS platforms. */

View File

@ -88,7 +88,7 @@ libzmq_la_SOURCES = \
dealer.hpp \ dealer.hpp \
xsub.hpp \ xsub.hpp \
ypipe.hpp \ ypipe.hpp \
ypipe_flat.hpp \ ypipe_base.hpp \
yqueue.hpp \ yqueue.hpp \
address.cpp \ address.cpp \
clock.cpp \ clock.cpp \

View File

@ -22,6 +22,7 @@
#include "likely.hpp" #include "likely.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "mutex.hpp"
#include <stddef.h> #include <stddef.h>
@ -41,9 +42,49 @@
#include <time.h> #include <time.h>
#endif #endif
#ifdef ZMQ_HAVE_WINDOWS
typedef ULONGLONG (*f_compatible_get_tick_count64)();
static zmq::mutex_t compatible_get_tick_count64_mutex;
ULONGLONG compatible_get_tick_count64()
{
compatible_get_tick_count64_mutex.lock();
static DWORD s_wrap = 0;
static DWORD s_last_tick = 0;
const DWORD current_tick = ::GetTickCount();
if (current_tick < s_last_tick)
++s_wrap;
s_last_tick = current_tick;
const ULONGLONG result = (static_cast<ULONGLONG>(s_wrap) << 32) + static_cast<ULONGLONG>(current_tick);
compatible_get_tick_count64_mutex.unlock();
return result;
}
f_compatible_get_tick_count64 init_compatible_get_tick_count64()
{
f_compatible_get_tick_count64 func = NULL;
HMODULE module = ::LoadLibraryA("Kernel32.dll");
if (module != NULL)
func = reinterpret_cast<f_compatible_get_tick_count64>(::GetProcAddress(module, "GetTickCount64"));
if (func == NULL)
func = compatible_get_tick_count64;
return func;
}
static f_compatible_get_tick_count64 my_get_tick_count64 = init_compatible_get_tick_count64();
#endif
zmq::clock_t::clock_t () : zmq::clock_t::clock_t () :
last_tsc (rdtsc ()), last_tsc (rdtsc ()),
#ifdef ZMQ_HAVE_WINDOWS
last_time (static_cast<uint64_t>((*my_get_tick_count64)()))
#else
last_time (now_us () / 1000) last_time (now_us () / 1000)
#endif
{ {
} }
@ -106,7 +147,17 @@ uint64_t zmq::clock_t::now_ms ()
// If TSC is not supported, get precise time and chop off the microseconds. // If TSC is not supported, get precise time and chop off the microseconds.
if (!tsc) if (!tsc)
{
#ifdef ZMQ_HAVE_WINDOWS
// Under Windows, now_us is not so reliable since QueryPerformanceCounter
// does not guarantee that it will use a hardware that offers a monotonic timer.
// So, lets use GetTickCount when GetTickCount64 is not available with an workaround
// to its 32 bit limitation.
return static_cast<uint64_t>((*my_get_tick_count64)());
#else
return now_us () / 1000; return now_us () / 1000;
#endif
}
// If TSC haven't jumped back (in case of migration to a different // If TSC haven't jumped back (in case of migration to a different
// CPU core) and if not too much time elapsed since last measurement, // CPU core) and if not too much time elapsed since last measurement,
@ -115,7 +166,11 @@ uint64_t zmq::clock_t::now_ms ()
return last_time; return last_time;
last_tsc = tsc; last_tsc = tsc;
#ifdef ZMQ_HAVE_WINDOWS
last_time = static_cast<uint64_t>((*my_get_tick_count64)());
#else
last_time = now_us () / 1000; last_time = now_us () / 1000;
#endif
return last_time; return last_time;
} }

View File

@ -38,6 +38,14 @@
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
int clipped_maxsocket(int max_requested)
{
if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
max_requested = zmq::poller_t::max_fds () - 1; // -1 because we need room for the repear mailbox.
return max_requested;
}
zmq::ctx_t::ctx_t () : zmq::ctx_t::ctx_t () :
tag (ZMQ_CTX_TAG_VALUE_GOOD), tag (ZMQ_CTX_TAG_VALUE_GOOD),
starting (true), starting (true),
@ -45,7 +53,7 @@ zmq::ctx_t::ctx_t () :
reaper (NULL), reaper (NULL),
slot_count (0), slot_count (0),
slots (NULL), slots (NULL),
max_sockets (ZMQ_MAX_SOCKETS_DFLT), max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
io_thread_count (ZMQ_IO_THREADS_DFLT), io_thread_count (ZMQ_IO_THREADS_DFLT),
ipv6 (false) ipv6 (false)
{ {
@ -74,13 +82,11 @@ zmq::ctx_t::~ctx_t ()
delete io_threads [i]; delete io_threads [i];
// Deallocate the reaper thread object. // Deallocate the reaper thread object.
if (reaper)
delete reaper; delete reaper;
// Deallocate the array of mailboxes. No special work is // Deallocate the array of mailboxes. No special work is
// needed as mailboxes themselves were deallocated with their // needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects. // corresponding io_thread/socket objects.
if (slots)
free (slots); free (slots);
// Remove the tag, so that the object is considered dead. // Remove the tag, so that the object is considered dead.
@ -109,7 +115,6 @@ int zmq::ctx_t::terminate ()
// restarted. // restarted.
bool restarted = terminating; bool restarted = terminating;
terminating = true; terminating = true;
slot_sync.unlock ();
// First attempt to terminate the context. // First attempt to terminate the context.
if (!restarted) { if (!restarted) {
@ -117,13 +122,12 @@ int zmq::ctx_t::terminate ()
// First send stop command to sockets so that any blocking calls // First send stop command to sockets so that any blocking calls
// can be interrupted. If there are no sockets we can ask reaper // can be interrupted. If there are no sockets we can ask reaper
// thread to stop. // thread to stop.
slot_sync.lock ();
for (sockets_t::size_type i = 0; i != sockets.size (); i++) for (sockets_t::size_type i = 0; i != sockets.size (); i++)
sockets [i]->stop (); sockets [i]->stop ();
if (sockets.empty ()) if (sockets.empty ())
reaper->stop (); reaper->stop ();
slot_sync.unlock ();
} }
slot_sync.unlock();
// Wait till reaper thread closes all the sockets. // Wait till reaper thread closes all the sockets.
command_t cmd; command_t cmd;
@ -165,7 +169,7 @@ int zmq::ctx_t::shutdown ()
int zmq::ctx_t::set (int option_, int optval_) int zmq::ctx_t::set (int option_, int optval_)
{ {
int rc = 0; int rc = 0;
if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1) { if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
opt_sync.lock (); opt_sync.lock ();
max_sockets = optval_; max_sockets = optval_;
opt_sync.unlock (); opt_sync.unlock ();

View File

@ -132,8 +132,8 @@ namespace zmq
sockets_t sockets; sockets_t sockets;
// List of unused thread slots. // List of unused thread slots.
typedef std::vector <uint32_t> emtpy_slots_t; typedef std::vector <uint32_t> empty_slots_t;
emtpy_slots_t empty_slots; empty_slots_t empty_slots;
// If true, zmq_init has been called but no socket has been created // If true, zmq_init has been called but no socket has been created
// yet. Launching of I/O threads is delayed. // yet. Launching of I/O threads is delayed.

View File

@ -297,34 +297,38 @@ int zmq::curve_client_t::process_welcome (msg_t *msg_)
int zmq::curve_client_t::produce_initiate (msg_t *msg_) int zmq::curve_client_t::produce_initiate (msg_t *msg_)
{ {
uint8_t vouch_nonce [crypto_box_NONCEBYTES]; uint8_t vouch_nonce [crypto_box_NONCEBYTES];
uint8_t vouch_plaintext [crypto_box_ZEROBYTES + 32]; uint8_t vouch_plaintext [crypto_box_ZEROBYTES + 64];
uint8_t vouch_box [crypto_box_BOXZEROBYTES + 48]; uint8_t vouch_box [crypto_box_BOXZEROBYTES + 80];
// Create vouch = Box [C'](C->S) // Create vouch = Box [C',S](C->S')
memset (vouch_plaintext, 0, crypto_box_ZEROBYTES); memset (vouch_plaintext, 0, crypto_box_ZEROBYTES);
memcpy (vouch_plaintext + crypto_box_ZEROBYTES, cn_public, 32); memcpy (vouch_plaintext + crypto_box_ZEROBYTES, cn_public, 32);
memcpy (vouch_plaintext + crypto_box_ZEROBYTES + 32, server_key, 32);
memcpy (vouch_nonce, "VOUCH---", 8); memcpy (vouch_nonce, "VOUCH---", 8);
randombytes (vouch_nonce + 8, 16); randombytes (vouch_nonce + 8, 16);
int rc = crypto_box (vouch_box, vouch_plaintext, int rc = crypto_box (vouch_box, vouch_plaintext,
sizeof vouch_plaintext, sizeof vouch_plaintext,
vouch_nonce, server_key, secret_key); vouch_nonce, cn_server, secret_key);
zmq_assert (rc == 0); zmq_assert (rc == 0);
// Assume here that metadata is limited to 256 bytes
uint8_t initiate_nonce [crypto_box_NONCEBYTES]; uint8_t initiate_nonce [crypto_box_NONCEBYTES];
uint8_t initiate_plaintext [crypto_box_ZEROBYTES + 96 + 256]; uint8_t initiate_plaintext [crypto_box_ZEROBYTES + 128 + 256];
uint8_t initiate_box [crypto_box_BOXZEROBYTES + 112 + 256]; uint8_t initiate_box [crypto_box_BOXZEROBYTES + 144 + 256];
// Create Box [C + vouch + metadata](C'->S') // Create Box [C + vouch + metadata](C'->S')
memset (initiate_plaintext, 0, crypto_box_ZEROBYTES); memset (initiate_plaintext, 0, crypto_box_ZEROBYTES);
memcpy (initiate_plaintext + crypto_box_ZEROBYTES, public_key, 32); memcpy (initiate_plaintext + crypto_box_ZEROBYTES,
public_key, 32);
memcpy (initiate_plaintext + crypto_box_ZEROBYTES + 32, memcpy (initiate_plaintext + crypto_box_ZEROBYTES + 32,
vouch_nonce + 8, 16); vouch_nonce + 8, 16);
memcpy (initiate_plaintext + crypto_box_ZEROBYTES + 48, memcpy (initiate_plaintext + crypto_box_ZEROBYTES + 48,
vouch_box + crypto_box_BOXZEROBYTES, 48); vouch_box + crypto_box_BOXZEROBYTES, 80);
uint8_t *ptr = initiate_plaintext + crypto_box_ZEROBYTES + 96; // Metadata starts after vouch
uint8_t *ptr = initiate_plaintext + crypto_box_ZEROBYTES + 128;
// Add socket type property // Add socket type property
const char *socket_type = socket_type_string (options.type); const char *socket_type = socket_type_string (options.type);
@ -359,7 +363,6 @@ int zmq::curve_client_t::produce_initiate (msg_t *msg_)
// Box [C + vouch + metadata](C'->S') // Box [C + vouch + metadata](C'->S')
memcpy (initiate + 113, initiate_box + crypto_box_BOXZEROBYTES, memcpy (initiate + 113, initiate_box + crypto_box_BOXZEROBYTES,
mlen - crypto_box_BOXZEROBYTES); mlen - crypto_box_BOXZEROBYTES);
cn_nonce++; cn_nonce++;
return 0; return 0;

View File

@ -338,7 +338,7 @@ int zmq::curve_server_t::produce_welcome (msg_t *msg_)
int zmq::curve_server_t::process_initiate (msg_t *msg_) int zmq::curve_server_t::process_initiate (msg_t *msg_)
{ {
if (msg_->size () < 225) { if (msg_->size () < 257) {
errno = EPROTO; errno = EPROTO;
return -1; return -1;
} }
@ -369,19 +369,17 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
} }
// Check cookie plain text is as expected [C' + s'] // Check cookie plain text is as expected [C' + s']
if (memcmp (cookie_plaintext + crypto_secretbox_ZEROBYTES, if (memcmp (cookie_plaintext + crypto_secretbox_ZEROBYTES, cn_client, 32)
cn_client, 32) || memcmp (cookie_plaintext + crypto_secretbox_ZEROBYTES + 32, cn_secret, 32)) {
|| memcmp (cookie_plaintext + crypto_secretbox_ZEROBYTES + 32, errno = EPROTO;
cn_secret, 32)) {
errno = EAGAIN;
return -1; return -1;
} }
const size_t clen = (msg_->size () - 113) + crypto_box_BOXZEROBYTES; const size_t clen = (msg_->size () - 113) + crypto_box_BOXZEROBYTES;
uint8_t initiate_nonce [crypto_box_NONCEBYTES]; uint8_t initiate_nonce [crypto_box_NONCEBYTES];
uint8_t initiate_plaintext [crypto_box_ZEROBYTES + 96 + 256]; uint8_t initiate_plaintext [crypto_box_ZEROBYTES + 128 + 256];
uint8_t initiate_box [crypto_box_BOXZEROBYTES + 112 + 256]; uint8_t initiate_box [crypto_box_BOXZEROBYTES + 144 + 256];
// Open Box [C + vouch + metadata](C'->S') // Open Box [C + vouch + metadata](C'->S')
memset (initiate_box, 0, crypto_box_BOXZEROBYTES); memset (initiate_box, 0, crypto_box_BOXZEROBYTES);
@ -401,13 +399,13 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
const uint8_t *client_key = initiate_plaintext + crypto_box_ZEROBYTES; const uint8_t *client_key = initiate_plaintext + crypto_box_ZEROBYTES;
uint8_t vouch_nonce [crypto_box_NONCEBYTES]; uint8_t vouch_nonce [crypto_box_NONCEBYTES];
uint8_t vouch_plaintext [crypto_box_ZEROBYTES + 32]; uint8_t vouch_plaintext [crypto_box_ZEROBYTES + 64];
uint8_t vouch_box [crypto_box_BOXZEROBYTES + 48]; uint8_t vouch_box [crypto_box_BOXZEROBYTES + 80];
// Open Box [C'](C->S) and check contents // Open Box Box [C',S](C->S') and check contents
memset (vouch_box, 0, crypto_box_BOXZEROBYTES); memset (vouch_box, 0, crypto_box_BOXZEROBYTES);
memcpy (vouch_box + crypto_box_BOXZEROBYTES, memcpy (vouch_box + crypto_box_BOXZEROBYTES,
initiate_plaintext + crypto_box_ZEROBYTES + 48, 48); initiate_plaintext + crypto_box_ZEROBYTES + 48, 80);
memcpy (vouch_nonce, "VOUCH---", 8); memcpy (vouch_nonce, "VOUCH---", 8);
memcpy (vouch_nonce + 8, memcpy (vouch_nonce + 8,
@ -415,7 +413,7 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
rc = crypto_box_open (vouch_plaintext, vouch_box, rc = crypto_box_open (vouch_plaintext, vouch_box,
sizeof vouch_box, sizeof vouch_box,
vouch_nonce, client_key, secret_key); vouch_nonce, client_key, cn_secret);
if (rc != 0) { if (rc != 0) {
errno = EPROTO; errno = EPROTO;
return -1; return -1;
@ -443,8 +441,8 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
} }
} }
return parse_metadata (initiate_plaintext + crypto_box_ZEROBYTES + 96, return parse_metadata (initiate_plaintext + crypto_box_ZEROBYTES + 128,
clen - crypto_box_ZEROBYTES - 96); clen - crypto_box_ZEROBYTES - 128);
} }
int zmq::curve_server_t::produce_ready (msg_t *msg_) int zmq::curve_server_t::produce_ready (msg_t *msg_)

View File

@ -133,6 +133,11 @@ void zmq::devpoll_t::stop ()
stopping = true; stopping = true;
} }
int zmq::devpoll_t::max_fds ()
{
return -1;
}
void zmq::devpoll_t::loop () void zmq::devpoll_t::loop ()
{ {
while (!stopping) { while (!stopping) {

View File

@ -56,6 +56,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -126,6 +126,11 @@ void zmq::epoll_t::stop ()
stopping = true; stopping = true;
} }
int zmq::epoll_t::max_fds ()
{
return -1;
}
void zmq::epoll_t::loop () void zmq::epoll_t::loop ()
{ {
epoll_event ev_buf [max_io_events]; epoll_event ev_buf [max_io_events];

View File

@ -58,6 +58,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -41,11 +41,11 @@ namespace zmq
// This method is called by the session to signalise that more // This method is called by the session to signalise that more
// messages can be written to the pipe. // messages can be written to the pipe.
virtual void activate_in () = 0; virtual void restart_input () = 0;
// This method is called by the session to signalise that there // This method is called by the session to signalise that there
// are messages to send available. // are messages to send available.
virtual void activate_out () = 0; virtual void restart_output () = 0;
virtual void zap_msg_available () = 0; virtual void zap_msg_available () = 0;
}; };

View File

@ -51,9 +51,20 @@ int zmq::ipc_address_t::resolve (const char *path_)
errno = ENAMETOOLONG; errno = ENAMETOOLONG;
return -1; return -1;
} }
#if defined ZMQ_HAVE_LINUX
if (path_[0] == '@' && !path_[1]) {
errno = EINVAL;
return -1;
}
#endif
address.sun_family = AF_UNIX; address.sun_family = AF_UNIX;
strcpy (address.sun_path, path_); strcpy (address.sun_path, path_);
#if defined ZMQ_HAVE_LINUX
/* Abstract sockets on Linux start with '\0' */
if (path_[0] == '@')
*address.sun_path = '\0';
#endif
return 0; return 0;
} }
@ -65,7 +76,15 @@ int zmq::ipc_address_t::to_string (std::string &addr_)
} }
std::stringstream s; std::stringstream s;
#if !defined ZMQ_HAVE_LINUX
s << "ipc://" << address.sun_path; s << "ipc://" << address.sun_path;
#else
s << "ipc://";
if (!address.sun_path[0] && address.sun_path[1])
s << "@" << address.sun_path + 1;
else
s << address.sun_path;
#endif
addr_ = s.str (); addr_ = s.str ();
return 0; return 0;
} }
@ -77,6 +96,10 @@ const sockaddr *zmq::ipc_address_t::addr () const
socklen_t zmq::ipc_address_t::addrlen () const socklen_t zmq::ipc_address_t::addrlen () const
{ {
#if defined ZMQ_HAVE_LINUX
if (!address.sun_path[0] && address.sun_path[1])
return (socklen_t) strlen(address.sun_path + 1) + sizeof (sa_family_t) + 1;
#endif
return (socklen_t) sizeof (address); return (socklen_t) sizeof (address);
} }

View File

@ -152,6 +152,11 @@ void zmq::kqueue_t::stop ()
stopping = true; stopping = true;
} }
int zmq::kqueue_t::max_fds ()
{
return -1;
}
void zmq::kqueue_t::loop () void zmq::kqueue_t::loop ()
{ {
while (!stopping) { while (!stopping) {

View File

@ -58,6 +58,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -54,7 +54,6 @@ zmq::mtrie_t::~mtrie_t ()
else else
if (count > 1) { if (count > 1) {
for (unsigned short i = 0; i != count; ++i) for (unsigned short i = 0; i != count; ++i)
if (next.table [i])
delete next.table [i]; delete next.table [i];
free (next.table); free (next.table);
} }

View File

@ -286,7 +286,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
break; break;
case ZMQ_ZAP_DOMAIN: case ZMQ_ZAP_DOMAIN:
if (optvallen_ >= 0 && optvallen_ < 256) { if (optvallen_ < 256) {
zap_domain.assign ((const char *) optval_, optvallen_); zap_domain.assign ((const char *) optval_, optvallen_);
return 0; return 0;
} }

View File

@ -102,12 +102,12 @@ void zmq::pgm_receiver_t::terminate ()
delete this; delete this;
} }
void zmq::pgm_receiver_t::activate_out () void zmq::pgm_receiver_t::restart_output ()
{ {
drop_subscriptions (); drop_subscriptions ();
} }
void zmq::pgm_receiver_t::activate_in () void zmq::pgm_receiver_t::restart_input ()
{ {
zmq_assert (session != NULL); zmq_assert (session != NULL);
zmq_assert (active_tsi != NULL); zmq_assert (active_tsi != NULL);

View File

@ -57,8 +57,8 @@ namespace zmq
void plug (zmq::io_thread_t *io_thread_, void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_); zmq::session_base_t *session_);
void terminate (); void terminate ();
void activate_in (); void restart_input ();
void activate_out (); void restart_output ();
void zap_msg_available () {} void zap_msg_available () {}
// i_poll_events interface implementation. // i_poll_events interface implementation.

View File

@ -119,13 +119,13 @@ void zmq::pgm_sender_t::terminate ()
delete this; delete this;
} }
void zmq::pgm_sender_t::activate_out () void zmq::pgm_sender_t::restart_output ()
{ {
set_pollout (handle); set_pollout (handle);
out_event (); out_event ();
} }
void zmq::pgm_sender_t::activate_in () void zmq::pgm_sender_t::restart_input ()
{ {
zmq_assert (false); zmq_assert (false);
} }

View File

@ -56,8 +56,8 @@ namespace zmq
void plug (zmq::io_thread_t *io_thread_, void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_); zmq::session_base_t *session_);
void terminate (); void terminate ();
void activate_in (); void restart_input ();
void activate_out (); void restart_output ();
void zap_msg_available () {} void zap_msg_available () {}
// i_poll_events interface implementation. // i_poll_events interface implementation.

View File

@ -114,6 +114,11 @@ void zmq::poll_t::stop ()
stopping = true; stopping = true;
} }
int zmq::poll_t::max_fds ()
{
return -1;
}
void zmq::poll_t::loop () void zmq::poll_t::loop ()
{ {
while (!stopping) { while (!stopping) {

View File

@ -59,6 +59,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -198,16 +198,16 @@ int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_
bool is_int = (optvallen_ == sizeof (int)); bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0; int value = is_int? *((int *) optval_): 0;
switch (option_) { switch (option_) {
case ZMQ_REQ_REQUEST_IDS: case ZMQ_REQ_CORRELATE:
if (is_int && value >= 0) { if (is_int && value >= 0) {
request_id_frames_enabled = (value != 0); request_id_frames_enabled = (value != 0);
return 0; return 0;
} }
break; break;
case ZMQ_REQ_STRICT: case ZMQ_REQ_RELAXED:
if (is_int && value >= 0) { if (is_int && value >= 0) {
strict = (value != 0); strict = (value == 0);
return 0; return 0;
} }
break; break;

View File

@ -225,6 +225,8 @@ int zmq::router_t::xsend (msg_t *msg_)
current_out->terminate (false); current_out->terminate (false);
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
current_out = NULL; current_out = NULL;
return 0; return 0;
} }

View File

@ -144,6 +144,11 @@ void zmq::select_t::stop ()
stopping = true; stopping = true;
} }
int zmq::select_t::max_fds ()
{
return FD_SETSIZE;
}
void zmq::select_t::loop () void zmq::select_t::loop ()
{ {
while (!stopping) { while (!stopping) {

View File

@ -69,6 +69,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -96,7 +96,6 @@ zmq::session_base_t::~session_base_t ()
if (engine) if (engine)
engine->terminate (); engine->terminate ();
if (addr)
delete addr; delete addr;
} }
@ -244,7 +243,7 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
} }
if (likely (pipe_ == pipe)) if (likely (pipe_ == pipe))
engine->activate_out (); engine->restart_output ();
else else
engine->zap_msg_available (); engine->zap_msg_available ();
} }
@ -258,7 +257,7 @@ void zmq::session_base_t::write_activated (pipe_t *pipe_)
} }
if (engine) if (engine)
engine->activate_in (); engine->restart_input ();
} }
void zmq::session_base_t::hiccuped (pipe_t *) void zmq::session_base_t::hiccuped (pipe_t *)

View File

@ -80,13 +80,10 @@
zmq::signaler_t::signaler_t () zmq::signaler_t::signaler_t ()
{ {
// Create the socketpair for signaling. // Create the socketpair for signaling.
int rc = make_fdpair (&r, &w); if (make_fdpair (&r, &w) == 0) {
errno_assert (rc == 0);
// Set both fds to non-blocking mode.
unblock_socket (w); unblock_socket (w);
unblock_socket (r); unblock_socket (r);
}
#ifdef HAVE_FORK #ifdef HAVE_FORK
pid = getpid(); pid = getpid();
#endif #endif
@ -184,8 +181,7 @@ int zmq::signaler_t::wait (int timeout_)
return -1; return -1;
} }
#ifdef HAVE_FORK #ifdef HAVE_FORK
if (unlikely(pid != getpid())) if (unlikely(pid != getpid())) {
{
// we have forked and the file descriptor is closed. Emulate an interupt // we have forked and the file descriptor is closed. Emulate an interupt
// response. // response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid()); //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
@ -266,39 +262,27 @@ void zmq::signaler_t::recv ()
#ifdef HAVE_FORK #ifdef HAVE_FORK
void zmq::signaler_t::forked() void zmq::signaler_t::forked()
{ {
int oldr = r; // Close file descriptors created in the parent and create new pair
#if !defined ZMQ_HAVE_EVENTFD close (r);
int oldw = w; close (w);
#endif
// replace the file descriptors created in the parent with new
// ones, and close the inherited ones
make_fdpair (&r, &w); make_fdpair (&r, &w);
#if defined ZMQ_HAVE_EVENTFD
int rc = close (oldr);
errno_assert (rc == 0);
#else
int rc = close (oldw);
errno_assert (rc == 0);
rc = close (oldr);
errno_assert (rc == 0);
#endif
} }
#endif #endif
// Returns -1 if we could not make the socket pair successfully
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{ {
#if defined ZMQ_HAVE_EVENTFD #if defined ZMQ_HAVE_EVENTFD
// Create eventfd object.
fd_t fd = eventfd (0, 0); fd_t fd = eventfd (0, 0);
errno_assert (fd != -1); if (fd == -1) {
*w_ = fd; errno_assert (errno == ENFILE || errno == EMFILE);
*r_ = fd; *w_ = *r_ = -1;
return -1;
}
else {
*w_ = *r_ = fd;
return 0; return 0;
}
#elif defined ZMQ_HAVE_WINDOWS #elif defined ZMQ_HAVE_WINDOWS
# if !defined _WIN32_WCE # if !defined _WIN32_WCE
@ -328,14 +312,11 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
HANDLE sync = CreateEvent (NULL, FALSE, TRUE, TEXT ("Global\\zmq-signaler-port-sync")); HANDLE sync = CreateEvent (NULL, FALSE, TRUE, TEXT ("Global\\zmq-signaler-port-sync"));
# endif # endif
if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED) if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
sync = OpenEvent (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE, TEXT ("Global\\zmq-signaler-port-sync")); sync = OpenEvent (SYNCHRONIZE | EVENT_MODIFY_STATE,
FALSE, TEXT ("Global\\zmq-signaler-port-sync"));
win_assert (sync != NULL); win_assert (sync != NULL);
// Enter the critical section.
DWORD dwrc = WaitForSingleObject (sync, INFINITE);
zmq_assert (dwrc == WAIT_OBJECT_0);
// Windows has no 'socketpair' function. CreatePipe is no good as pipe // Windows has no 'socketpair' function. CreatePipe is no good as pipe
// handles cannot be polled on. Here we create the socketpair by hand. // handles cannot be polled on. Here we create the socketpair by hand.
*w_ = INVALID_SOCKET; *w_ = INVALID_SOCKET;
@ -356,58 +337,51 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
(char *)&tcp_nodelay, sizeof (tcp_nodelay)); (char *)&tcp_nodelay, sizeof (tcp_nodelay));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Bind listening socket to signaler port. // Init sockaddr to signaler port.
struct sockaddr_in addr; struct sockaddr_in addr;
memset (&addr, 0, sizeof (addr)); memset (&addr, 0, sizeof (addr));
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = htons (signaler_port); addr.sin_port = htons (signaler_port);
rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
// Listen for incomming connections.
rc = listen (listener, 1);
wsa_assert (rc != SOCKET_ERROR);
// Create the writer socket. // Create the writer socket.
*w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (*w_ != INVALID_SOCKET); wsa_assert (*w_ != INVALID_SOCKET);
#if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation ((HANDLE) *w_, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
#else
BOOL brc;
#endif
// Set TCP_NODELAY on writer socket. // Set TCP_NODELAY on writer socket.
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
(char *)&tcp_nodelay, sizeof (tcp_nodelay)); (char *)&tcp_nodelay, sizeof (tcp_nodelay));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Enter the critical section.
DWORD dwrc = WaitForSingleObject (sync, INFINITE);
zmq_assert (dwrc == WAIT_OBJECT_0);
// Bind listening socket to signaler port.
rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
// Listen for incoming connections.
if (rc != SOCKET_ERROR)
rc = listen (listener, 1);
// Connect writer to the listener. // Connect writer to the listener.
if (rc != SOCKET_ERROR)
rc = connect (*w_, (struct sockaddr*) &addr, sizeof (addr)); rc = connect (*w_, (struct sockaddr*) &addr, sizeof (addr));
// Save errno if connection fails
int conn_errno = 0;
if (rc == SOCKET_ERROR) {
conn_errno = WSAGetLastError ();
} else {
// Accept connection from writer. // Accept connection from writer.
if (rc != SOCKET_ERROR)
*r_ = accept (listener, NULL, NULL); *r_ = accept (listener, NULL, NULL);
if (*r_ == INVALID_SOCKET) { // Save errno if error occurred in bind/listen/connect/accept.
conn_errno = WSAGetLastError (); int saved_errno = 0;
} if (*r_ == INVALID_SOCKET)
} saved_errno = WSAGetLastError ();
// We don't need the listening socket anymore. Close it. // We don't need the listening socket anymore. Close it.
rc = closesocket (listener); closesocket (listener);
wsa_assert (rc != SOCKET_ERROR);
// Exit the critical section. // Exit the critical section.
brc = SetEvent (sync); BOOL brc = SetEvent (sync);
win_assert (brc != 0); win_assert (brc != 0);
// Release the kernel object // Release the kernel object
@ -421,21 +395,16 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
win_assert (brc); win_assert (brc);
# endif # endif
return 0; return 0;
} else { }
else {
// Cleanup writer if connection failed // Cleanup writer if connection failed
if (*w_ != INVALID_SOCKET) {
rc = closesocket (*w_); rc = closesocket (*w_);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
*w_ = INVALID_SOCKET; *w_ = INVALID_SOCKET;
}
// Set errno from saved value // Set errno from saved value
errno = wsa_error_to_errno (conn_errno); errno = wsa_error_to_errno (saved_errno);
// Ideally, we would return errno to the caller signaler_t()
// Unfortunately, it uses errno_assert() which gives "Unknown error"
// We might as well assert here and print the actual error message
wsa_assert_no (conn_errno);
return -1; return -1;
} }
@ -493,15 +462,20 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
return 0; return 0;
#else // All other implementations support socketpair() #else
// All other implementations support socketpair()
int sv [2]; int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
errno_assert (rc == 0); if (rc == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
}
else {
*w_ = sv [0]; *w_ = sv [0];
*r_ = sv [1]; *r_ = sv [1];
return 0; return 0;
}
#endif #endif
} }

View File

@ -58,7 +58,8 @@ namespace zmq
// to pass the signals. // to pass the signals.
static int make_fdpair (fd_t *r_, fd_t *w_); static int make_fdpair (fd_t *r_, fd_t *w_);
// Underlying write & read file descriptor. // Underlying write & read file descriptor
// Will be -1 if we exceeded number of available handles
fd_t w; fd_t w;
fd_t r; fd_t r;
@ -74,7 +75,6 @@ namespace zmq
void close_internal(); void close_internal();
#endif #endif
}; };
} }
#endif #endif

View File

@ -79,7 +79,6 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
{ {
socket_base_t *s = NULL; socket_base_t *s = NULL;
switch (type_) { switch (type_) {
case ZMQ_PAIR: case ZMQ_PAIR:
s = new (std::nothrow) pair_t (parent_, tid_, sid_); s = new (std::nothrow) pair_t (parent_, tid_, sid_);
break; break;
@ -120,6 +119,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
} }
if (s->mailbox.get_fd () == -1)
return NULL;
alloc_assert (s); alloc_assert (s);
return s; return s;
} }

View File

@ -74,8 +74,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
io_error (false), io_error (false),
subscription_required (false), subscription_required (false),
mechanism (NULL), mechanism (NULL),
input_paused (false), input_stopped (false),
output_paused (false), output_stopped (false),
socket (NULL) socket (NULL)
{ {
int rc = tx_msg.init (); int rc = tx_msg.init ();
@ -114,11 +114,8 @@ zmq::stream_engine_t::~stream_engine_t ()
int rc = tx_msg.close (); int rc = tx_msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
if (encoder != NULL)
delete encoder; delete encoder;
if (decoder != NULL)
delete decoder; delete decoder;
if (mechanism != NULL)
delete mechanism; delete mechanism;
} }
@ -207,7 +204,7 @@ void zmq::stream_engine_t::in_event ()
zmq_assert (decoder); zmq_assert (decoder);
// If there has been an I/O error, stop polling. // If there has been an I/O error, stop polling.
if (input_paused) { if (input_stopped) {
rm_fd (handle); rm_fd (handle);
io_error = true; io_error = true;
return; return;
@ -220,17 +217,22 @@ void zmq::stream_engine_t::in_event ()
// Note that buffer can be arbitrarily large. However, we assume // Note that buffer can be arbitrarily large. However, we assume
// the underlying TCP layer has fixed buffer size and thus the // the underlying TCP layer has fixed buffer size and thus the
// number of bytes read will be always limited. // number of bytes read will be always limited.
decoder->get_buffer (&inpos, &insize); size_t bufsize = 0;
const int bytes_read = read (inpos, insize); decoder->get_buffer (&inpos, &bufsize);
// Check whether the peer has closed the connection. int const rc = read (inpos, bufsize);
if (bytes_read == -1) { if (rc == 0) {
error ();
return;
}
if (rc == -1) {
if (errno != EAGAIN)
error (); error ();
return; return;
} }
// Adjust input size // Adjust input size
insize = static_cast <size_t> (bytes_read); insize = static_cast <size_t> (rc);
} }
int rc = 0; int rc = 0;
@ -255,7 +257,7 @@ void zmq::stream_engine_t::in_event ()
error (); error ();
return; return;
} }
input_paused = true; input_stopped = true;
reset_pollin (handle); reset_pollin (handle);
} }
@ -294,7 +296,7 @@ void zmq::stream_engine_t::out_event ()
// If there is no data to send, stop polling for output. // If there is no data to send, stop polling for output.
if (outsize == 0) { if (outsize == 0) {
output_paused = true; output_stopped = true;
reset_pollout (handle); reset_pollout (handle);
return; return;
} }
@ -331,14 +333,14 @@ void zmq::stream_engine_t::out_event ()
terminate (); terminate ();
} }
void zmq::stream_engine_t::activate_out () void zmq::stream_engine_t::restart_output ()
{ {
if (unlikely (io_error)) if (unlikely (io_error))
return; return;
if (likely (output_paused)) { if (likely (output_stopped)) {
set_pollout (handle); set_pollout (handle);
output_paused = false; output_stopped = false;
} }
// Speculative write: The assumption is that at the moment new message // Speculative write: The assumption is that at the moment new message
@ -348,9 +350,9 @@ void zmq::stream_engine_t::activate_out ()
out_event (); out_event ();
} }
void zmq::stream_engine_t::activate_in () void zmq::stream_engine_t::restart_input ()
{ {
zmq_assert (input_paused); zmq_assert (input_stopped);
zmq_assert (session != NULL); zmq_assert (session != NULL);
zmq_assert (decoder != NULL); zmq_assert (decoder != NULL);
@ -382,7 +384,7 @@ void zmq::stream_engine_t::activate_in ()
if (rc == -1 || io_error) if (rc == -1 || io_error)
error (); error ();
else { else {
input_paused = false; input_stopped = false;
set_pollin (handle); set_pollin (handle);
session->flush (); session->flush ();
@ -399,12 +401,15 @@ bool zmq::stream_engine_t::handshake ()
while (greeting_bytes_read < greeting_size) { while (greeting_bytes_read < greeting_size) {
const int n = read (greeting_recv + greeting_bytes_read, const int n = read (greeting_recv + greeting_bytes_read,
greeting_size - greeting_bytes_read); greeting_size - greeting_bytes_read);
if (n == -1) { if (n == 0) {
error (); error ();
return false; return false;
} }
if (n == 0) if (n == -1) {
if (errno != EAGAIN)
error ();
return false; return false;
}
greeting_bytes_read += n; greeting_bytes_read += n;
@ -483,6 +488,13 @@ bool zmq::stream_engine_t::handshake ()
// header data away. // header data away.
const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2; const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
unsigned char tmp [10], *bufferp = tmp; unsigned char tmp [10], *bufferp = tmp;
// Prepare the identity message and load it into encoder.
// Then consume bytes we have already sent to the peer.
const int rc = tx_msg.init_size (options.identity_size);
zmq_assert (rc == 0);
memcpy (tx_msg.data (), options.identity, options.identity_size);
encoder->load_msg (&tx_msg);
size_t buffer_size = encoder->encode (&bufferp, header_size); size_t buffer_size = encoder->encode (&bufferp, header_size);
zmq_assert (buffer_size == header_size); zmq_assert (buffer_size == header_size);
@ -617,8 +629,8 @@ int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
if (rc == 0) { if (rc == 0) {
if (mechanism->is_handshake_complete ()) if (mechanism->is_handshake_complete ())
mechanism_ready (); mechanism_ready ();
if (output_paused) if (output_stopped)
activate_out (); restart_output ();
} }
return rc; return rc;
@ -633,10 +645,10 @@ void zmq::stream_engine_t::zap_msg_available ()
error (); error ();
return; return;
} }
if (input_paused) if (input_stopped)
activate_in (); restart_input ();
if (output_paused) if (output_stopped)
activate_out (); restart_output ();
} }
void zmq::stream_engine_t::mechanism_ready () void zmq::stream_engine_t::mechanism_ready ()
@ -652,6 +664,7 @@ void zmq::stream_engine_t::mechanism_ready ()
return; return;
} }
errno_assert (rc == 0); errno_assert (rc == 0);
session->flush ();
} }
read_msg = &stream_engine_t::pull_and_encode; read_msg = &stream_engine_t::pull_and_encode;
@ -787,58 +800,45 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
{ {
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
int nbytes = recv (s, (char*) data_, (int) size_, 0); const int rc = recv (s, (char*) data_, (int) size_, 0);
// If not a single byte can be read from the socket in non-blocking mode // If not a single byte can be read from the socket in non-blocking mode
// we'll get an error (this may happen during the speculative read). // we'll get an error (this may happen during the speculative read).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) if (rc == SOCKET_ERROR) {
return 0; if (WSAGetLastError () == WSAEWOULDBLOCK)
errno = EAGAIN;
else {
wsa_assert (WSAGetLastError () == WSAENETDOWN
|| WSAGetLastError () == WSAENETRESET
|| WSAGetLastError () == WSAECONNABORTED
|| WSAGetLastError () == WSAETIMEDOUT
|| WSAGetLastError () == WSAECONNRESET
|| WSAGetLastError () == WSAECONNREFUSED
|| WSAGetLastError () == WSAENOTCONN);
errno = wsa_error_to_errno (WSAGetLastError ());
}
}
// Connection failure. return rc == SOCKET_ERROR? -1: rc;
if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAECONNABORTED ||
WSAGetLastError () == WSAETIMEDOUT ||
WSAGetLastError () == WSAECONNRESET ||
WSAGetLastError () == WSAECONNREFUSED ||
WSAGetLastError () == WSAENOTCONN))
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
// Orderly shutdown by the other peer.
if (nbytes == 0)
return -1;
return nbytes;
#else #else
ssize_t nbytes = recv (s, data_, size_, 0); const ssize_t rc = recv (s, data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not // Several errors are OK. When speculative read is being done we may not
// be able to read a single byte from the socket. Also, SIGSTOP issued // be able to read a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error. // by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || if (rc == -1) {
errno == EINTR))
return 0;
// Signalise peer failure.
if (nbytes == -1) {
errno_assert (errno != EBADF errno_assert (errno != EBADF
&& errno != EFAULT && errno != EFAULT
&& errno != EINVAL && errno != EINVAL
&& errno != ENOMEM && errno != ENOMEM
&& errno != ENOTSOCK); && errno != ENOTSOCK);
return -1; if (errno == EWOULDBLOCK || errno == EINTR)
errno = EAGAIN;
} }
// Orderly shutdown by the peer. return static_cast <int> (rc);
if (nbytes == 0)
return -1;
return static_cast <int> (nbytes);
#endif #endif
} }

View File

@ -60,8 +60,8 @@ namespace zmq
void plug (zmq::io_thread_t *io_thread_, void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_); zmq::session_base_t *session_);
void terminate (); void terminate ();
void activate_in (); void restart_input ();
void activate_out (); void restart_output ();
void zap_msg_available (); void zap_msg_available ();
// i_poll_events interface implementation. // i_poll_events interface implementation.
@ -87,10 +87,9 @@ namespace zmq
// of error or orderly shutdown by the other peer -1 is returned. // of error or orderly shutdown by the other peer -1 is returned.
int write (const void *data_, size_t size_); int write (const void *data_, size_t size_);
// Reads data from the socket (up to 'size' bytes). Returns the number // Reads data from the socket (up to 'size' bytes).
// of bytes actually read (even zero is to be considered to be // Returns the number of bytes actually read or -1 on error.
// a success). In case of error or orderly shutdown by the other // Zero indicates the peer has closed the connection.
// peer -1 is returned.
int read (void *data_, size_t size_); int read (void *data_, size_t size_);
int read_identity (msg_t *msg_); int read_identity (msg_t *msg_);
@ -179,10 +178,10 @@ namespace zmq
mechanism_t *mechanism; mechanism_t *mechanism;
// True iff the engine couldn't consume the last decoded message. // True iff the engine couldn't consume the last decoded message.
bool input_paused; bool input_stopped;
// True iff the engine doesn't have any message to encode. // True iff the engine doesn't have any message to encode.
bool output_paused; bool output_stopped;
// Socket // Socket
zmq::socket_base_t *socket; zmq::socket_base_t *socket;

View File

@ -48,7 +48,6 @@ zmq::trie_t::~trie_t ()
else else
if (count > 1) { if (count > 1) {
for (unsigned short i = 0; i != count; ++i) for (unsigned short i = 0; i != count; ++i)
if (next.table [i])
delete next.table [i]; delete next.table [i];
free (next.table); free (next.table);
} }

View File

@ -72,7 +72,6 @@ namespace zmq
} }
chunk_t *sc = spare_chunk.xchg (NULL); chunk_t *sc = spare_chunk.xchg (NULL);
if (sc)
free (sc); free (sc);
} }
@ -156,7 +155,6 @@ namespace zmq
// so for cache reasons we'll get rid of the spare and // so for cache reasons we'll get rid of the spare and
// use 'o' as the spare. // use 'o' as the spare.
chunk_t *cs = spare_chunk.xchg (o); chunk_t *cs = spare_chunk.xchg (o);
if (cs)
free (cs); free (cs);
} }
} }

View File

@ -30,6 +30,10 @@
#else #else
#include "windows.hpp" #include "windows.hpp"
#endif #endif
#ifdef HAVE_LIBSODIUM
# include <sodium.h>
#endif
void zmq_sleep (int seconds_) void zmq_sleep (int seconds_)
{ {
@ -100,10 +104,14 @@ static uint8_t decoder [96] = {
// Encode a binary frame as a string; destination string MUST be at least // Encode a binary frame as a string; destination string MUST be at least
// size * 5 / 4 bytes long plus 1 byte for the null terminator. Returns // size * 5 / 4 bytes long plus 1 byte for the null terminator. Returns
// dest. Size must be a multiple of 4. // dest. Size must be a multiple of 4.
// Returns NULL and sets errno = EINVAL for invalid input.
char *zmq_z85_encode (char *dest, uint8_t *data, size_t size) char *zmq_z85_encode (char *dest, uint8_t *data, size_t size)
{ {
assert (size % 4 == 0); if (size % 4 != 0) {
errno = EINVAL;
return NULL;
}
unsigned int char_nbr = 0; unsigned int char_nbr = 0;
unsigned int byte_nbr = 0; unsigned int byte_nbr = 0;
uint32_t value = 0; uint32_t value = 0;
@ -130,14 +138,19 @@ char *zmq_z85_encode (char *dest, uint8_t *data, size_t size)
// Decode an encoded string into a binary frame; dest must be at least // Decode an encoded string into a binary frame; dest must be at least
// strlen (string) * 4 / 5 bytes long. Returns dest. strlen (string) // strlen (string) * 4 / 5 bytes long. Returns dest. strlen (string)
// must be a multiple of 5. // must be a multiple of 5.
// Returns NULL and sets errno = EINVAL for invalid input.
uint8_t *zmq_z85_decode (uint8_t *dest, char *string) uint8_t *zmq_z85_decode (uint8_t *dest, char *string)
{ {
assert (strlen (string) % 5 == 0); if (strlen (string) % 5 != 0) {
errno = EINVAL;
return NULL;
}
unsigned int byte_nbr = 0; unsigned int byte_nbr = 0;
unsigned int char_nbr = 0; unsigned int char_nbr = 0;
unsigned int string_len = strlen (string);
uint32_t value = 0; uint32_t value = 0;
while (char_nbr < strlen (string)) { while (char_nbr < string_len) {
// Accumulate value in base 85 // Accumulate value in base 85
value = value * 85 + decoder [(uint8_t) string [char_nbr++] - 32]; value = value * 85 + decoder [(uint8_t) string [char_nbr++] - 32];
if (char_nbr % 5 == 0) { if (char_nbr % 5 == 0) {
@ -153,3 +166,35 @@ uint8_t *zmq_z85_decode (uint8_t *dest, char *string)
assert (byte_nbr == strlen (string) * 4 / 5); assert (byte_nbr == strlen (string) * 4 / 5);
return dest; return dest;
} }
// --------------------------------------------------------------------------
// Generate a public/private keypair with libsodium.
// Generated keys will be 40 byte z85-encoded strings.
// Returns 0 on success, -1 on failure, setting errno.
// Sets errno = ENOTSUP in the absence of libsodium.
int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key)
{
#ifdef HAVE_LIBSODIUM
# if crypto_box_PUBLICKEYBYTES != 32 \
|| crypto_box_SECRETKEYBYTES != 32
# error "libsodium not built correctly"
# endif
uint8_t public_key [32];
uint8_t secret_key [32];
int rc = crypto_box_keypair (public_key, secret_key);
// Is there a sensible errno to set here?
if (rc)
return rc;
zmq_z85_encode (z85_public_key, public_key, 32);
zmq_z85_encode (z85_secret_key, secret_key, 32);
return 0;
#else // requires libsodium
errno = ENOTSUP;
return -1;
#endif
}

View File

@ -14,12 +14,12 @@ noinst_PROGRAMS = test_system \
test_invalid_rep \ test_invalid_rep \
test_msg_flags \ test_msg_flags \
test_connect_resolve \ test_connect_resolve \
test_connect_delay \ test_immediate \
test_last_endpoint \ test_last_endpoint \
test_term_endpoint \ test_term_endpoint \
test_linger \
test_monitor \ test_monitor \
test_router_mandatory \ test_router_mandatory \
test_router_raw_empty \
test_probe_router \ test_probe_router \
test_stream \ test_stream \
test_disconnect_inproc \ test_disconnect_inproc \
@ -34,10 +34,13 @@ noinst_PROGRAMS = test_system \
test_spec_dealer \ test_spec_dealer \
test_spec_router \ test_spec_router \
test_spec_pushpull \ test_spec_pushpull \
test_req_request_ids \ test_req_correlate \
test_req_strict \ test_req_relaxed \
test_conflate \ test_conflate \
test_inproc_connect test_inproc_connect \
test_issue_566 \
test_abstract_ipc \
test_many_sockets
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
@ -56,14 +59,14 @@ test_hwm_SOURCES = test_hwm.cpp
test_reqrep_device_SOURCES = test_reqrep_device.cpp test_reqrep_device_SOURCES = test_reqrep_device.cpp
test_sub_forward_SOURCES = test_sub_forward.cpp test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp
test_linger_SOURCES = test_linger.cpp
test_msg_flags_SOURCES = test_msg_flags.cpp test_msg_flags_SOURCES = test_msg_flags.cpp
test_connect_resolve_SOURCES = test_connect_resolve.cpp test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_connect_delay_SOURCES = test_connect_delay.cpp test_immediate_SOURCES = test_immediate.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp test_monitor_SOURCES = test_monitor.cpp
test_router_mandatory_SOURCES = test_router_mandatory.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp
test_router_raw_empty_SOURCES = test_router_raw_empty.cpp
test_probe_router_SOURCES = test_probe_router.cpp test_probe_router_SOURCES = test_probe_router.cpp
test_stream_SOURCES = test_stream.cpp test_stream_SOURCES = test_stream.cpp
test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp
@ -78,10 +81,13 @@ test_spec_rep_SOURCES = test_spec_rep.cpp
test_spec_dealer_SOURCES = test_spec_dealer.cpp test_spec_dealer_SOURCES = test_spec_dealer.cpp
test_spec_router_SOURCES = test_spec_router.cpp test_spec_router_SOURCES = test_spec_router.cpp
test_spec_pushpull_SOURCES = test_spec_pushpull.cpp test_spec_pushpull_SOURCES = test_spec_pushpull.cpp
test_req_request_ids_SOURCES = test_req_request_ids.cpp test_req_correlate_SOURCES = test_req_correlate.cpp
test_req_strict_SOURCES = test_req_strict.cpp test_req_relaxed_SOURCES = test_req_relaxed.cpp
test_conflate_SOURCES = test_conflate.cpp test_conflate_SOURCES = test_conflate.cpp
test_inproc_connect_SOURCES = test_inproc_connect.cpp test_inproc_connect_SOURCES = test_inproc_connect.cpp
test_issue_566_SOURCES = test_issue_566.cpp
test_abstract_ipc_SOURCES = test_abstract_ipc.cpp
test_many_sockets_SOURCES = test_many_sockets.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
@ -92,4 +98,7 @@ endif
# Run the test cases # Run the test cases
TESTS = $(noinst_PROGRAMS) TESTS = $(noinst_PROGRAMS)
XFAIL_TESTS = test_linger
if !ON_LINUX
XFAIL_TESTS = test_abstract_ipc
endif

View File

@ -0,0 +1,57 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
assert (sb);
int rc = zmq_bind (sb, "ipc://@/tmp/tester");
assert (rc == 0);
char endpoint[200];
size_t size = sizeof(endpoint);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &size);
assert (rc == 0);
rc = strncmp(endpoint, "ipc://@/tmp/tester", size);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PAIR);
assert (sc);
rc = zmq_connect (sc, "ipc://@/tmp/tester");
assert (rc == 0);
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}

View File

@ -45,7 +45,6 @@ int main (int argc, char *argv [])
assert (rc == 0); assert (rc == 0);
int message_count = 20; int message_count = 20;
for (int j = 0; j < message_count; ++j) { for (int j = 0; j < message_count; ++j) {
rc = zmq_send(s_out, (void*)&j, sizeof(int), 0); rc = zmq_send(s_out, (void*)&j, sizeof(int), 0);
if (rc < 0) { if (rc < 0) {
@ -53,15 +52,13 @@ int main (int argc, char *argv [])
return -1; return -1;
} }
} }
msleep (SETTLE_TIME);
zmq_sleep (1);
int payload_recved = 0; int payload_recved = 0;
rc = zmq_recv (s_in, (void*)&payload_recved, sizeof(int), 0); rc = zmq_recv (s_in, (void*)&payload_recved, sizeof(int), 0);
assert (rc > 0); assert (rc > 0);
assert (payload_recved == message_count - 1); assert (payload_recved == message_count - 1);
rc = zmq_close (s_in); rc = zmq_close (s_in);
assert (rc == 0); assert (rc == 0);

View File

@ -61,7 +61,7 @@ void test_ctx_shutdown()
void *receiver_thread = zmq_threadstart (&receiver, socket); void *receiver_thread = zmq_threadstart (&receiver, socket);
// Wait for thread to start up and block // Wait for thread to start up and block
zmq_sleep (1); msleep (SETTLE_TIME);
// Shutdown context, if we used destroy here we would deadlock. // Shutdown context, if we used destroy here we would deadlock.
rc = zmq_ctx_shutdown (ctx); rc = zmq_ctx_shutdown (ctx);

View File

@ -33,7 +33,7 @@ int main (void)
assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 0); assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 0);
rc = zmq_ctx_set (ctx, ZMQ_IPV6, true); rc = zmq_ctx_set (ctx, ZMQ_IPV6, true);
assert (zmq_ctx_get (ctx, ZMQ_IPV6) == true); assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1);
void *router = zmq_socket (ctx, ZMQ_ROUTER); void *router = zmq_socket (ctx, ZMQ_ROUTER);
int ipv6; int ipv6;

View File

@ -117,7 +117,7 @@ int main (void)
// Set the key flag // Set the key flag
val = 1; val = 1;
rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); rc = zmq_setsockopt (from, ZMQ_IMMEDIATE, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
// Connect to the invalid socket // Connect to the invalid socket
@ -170,9 +170,9 @@ int main (void)
rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero)); rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0); assert (rc == 0);
// Frontend connects to backend using DELAY_ATTACH_ON_CONNECT // Frontend connects to backend using IMMEDIATE
int on = 1; int on = 1;
rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on)); rc = zmq_setsockopt (frontend, ZMQ_IMMEDIATE, &on, sizeof (on));
assert (rc == 0); assert (rc == 0);
rc = zmq_bind (backend, "tcp://127.0.0.1:5560"); rc = zmq_bind (backend, "tcp://127.0.0.1:5560");
assert (rc == 0); assert (rc == 0);
@ -193,8 +193,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Give time to process disconnect // Give time to process disconnect
// There's no way to do this except with a sleep msleep (SETTLE_TIME);
zmq_sleep(1);
// Send a message, should fail // Send a message, should fail
rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);

View File

@ -142,7 +142,7 @@ void test_connect_before_bind_pub_sub()
assert (rc == 0); assert (rc == 0);
// Wait for pub-sub connection to happen // Wait for pub-sub connection to happen
zmq_sleep (1); msleep (SETTLE_TIME);
// Queue up some data, this not will be dropped // Queue up some data, this not will be dropped
rc = zmq_send_const (connectSocket, "after", 6, 0); rc = zmq_send_const (connectSocket, "after", 6, 0);

View File

@ -80,7 +80,7 @@ int main (void)
rc = zmq_bind (sb, "inproc://a"); rc = zmq_bind (sb, "inproc://a");
assert (rc == 0); assert (rc == 0);
zmq_sleep(1); msleep (SETTLE_TIME);
void *sc = zmq_socket (ctx, ZMQ_PUSH); void *sc = zmq_socket (ctx, ZMQ_PUSH);
rc = zmq_connect (sc, "inproc://a"); rc = zmq_connect (sc, "inproc://a");

85
tests/test_issue_566.cpp Normal file
View File

@ -0,0 +1,85 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
// Issue 566 describes a problem in libzmq v4.0.0 where a dealer to router
// connection would fail randomly. The test works when the two sockets are
// on the same context, and failed when they were on separate contexts.
// Fixed by https://github.com/zeromq/libzmq/commit/be25cf.
int main (void)
{
setup_test_environment();
void *ctx1 = zmq_ctx_new ();
assert (ctx1);
void *ctx2 = zmq_ctx_new ();
assert (ctx2);
void *router = zmq_socket (ctx1, ZMQ_ROUTER);
int on = 1;
int rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &on, sizeof (on));
assert (rc == 0);
rc = zmq_bind (router, "tcp://127.0.0.1:5555");
assert (rc != -1);
// Repeat often enough to be sure this works as it should
for (int cycle = 0; cycle < 100; cycle++) {
// Create dealer with unique explicit identity
// We assume the router learns this out-of-band
void *dealer = zmq_socket (ctx2, ZMQ_DEALER);
char identity [10];
sprintf (identity, "%09d", cycle);
rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, identity, 10);
assert (rc == 0);
int rcvtimeo = 1000;
rc = zmq_setsockopt (dealer, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
assert (rc == 0);
rc = zmq_connect (dealer, "tcp://127.0.0.1:5555");
assert (rc == 0);
// Router will try to send to dealer, at short intervals.
// It typically takes 2-5 msec for the connection to establish
// on a loopback interface, but we'll allow up to one second
// before failing the test (e.g. for running on a debugger or
// a very slow system).
for (int attempt = 0; attempt < 500; attempt++) {
zmq_poll (0, 0, 2);
rc = zmq_send (router, identity, 10, ZMQ_SNDMORE);
if (rc == -1 && errno == EHOSTUNREACH)
continue;
assert (rc == 10);
rc = zmq_send (router, "HELLO", 5, 0);
assert (rc == 5);
break;
}
uint8_t buffer [5];
rc = zmq_recv (dealer, buffer, 5, 0);
assert (rc == 5);
assert (memcmp (buffer, "HELLO", 5) == 0);
close_zero_linger (dealer);
}
zmq_close (router);
zmq_ctx_destroy (ctx1);
zmq_ctx_destroy (ctx2);
return 0;
}

101
tests/test_many_sockets.cpp Normal file
View File

@ -0,0 +1,101 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include <zmq.h>
#include <stdio.h>
#include <stdlib.h>
#include <vector>
void test_system_max ()
{
// Keep allocating sockets until we run out of system resources
const int no_of_sockets = 2 * 65536;
void *ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_MAX_SOCKETS, no_of_sockets);
std::vector<void*> sockets;
while (true)
{
void *socket = zmq_socket(ctx, ZMQ_PAIR);
if (!socket)
break;
sockets.push_back(socket);
}
assert((int)sockets.size() < no_of_sockets);
// System is out of resources, further calls to zmq_socket should return NULL.
for (unsigned int i = 0; i < 10; ++i)
{
void *socket = zmq_socket(ctx, ZMQ_PAIR);
assert(socket == NULL);
}
// Clean up.
for (unsigned int i = 0; i < sockets.size(); ++i)
zmq_close(sockets[i]);
zmq_ctx_destroy(ctx);
}
void test_zmq_default_max ()
{
// Keep allocating sockets until we hit the default zeromq limit
void *ctx = zmq_ctx_new();
std::vector<void*> sockets;
while (true)
{
void *socket = zmq_socket(ctx, ZMQ_PAIR);
if (!socket)
break;
sockets.push_back(socket);
}
assert(sockets.size() == ZMQ_MAX_SOCKETS_DFLT);
// At zeromq max, further calls to zmq_socket should return NULL.
for (unsigned int i = 0; i < 10; ++i)
{
void *socket = zmq_socket(ctx, ZMQ_PAIR);
assert(socket == NULL);
}
// Clean up.
for (unsigned int i = 0; i < sockets.size(); ++i)
zmq_close(sockets[i]);
zmq_ctx_destroy(ctx);
}
int main(void)
{
setup_test_environment();
test_system_max ();
test_zmq_default_max ();
return 0;
}

View File

@ -211,7 +211,7 @@ int main (void)
rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL);
assert (rc == 0); assert (rc == 0);
threads [1] = zmq_threadstart(&req_socket_monitor, ctx); threads [1] = zmq_threadstart(&req_socket_monitor, ctx);
zmq_sleep(1); msleep (SETTLE_TIME);
// Bind REQ and REP // Bind REQ and REP
rc = zmq_bind (rep, addr.c_str()); rc = zmq_bind (rep, addr.c_str());
@ -238,8 +238,8 @@ int main (void)
rc = zmq_close (rep); rc = zmq_close (rep);
assert (rc == 0); assert (rc == 0);
// Allow some time for detecting error states // Allow enough time for detecting error states
zmq_sleep(1); msleep (250);
// Close the REQ socket // Close the REQ socket
rc = zmq_close (req); rc = zmq_close (req);

View File

@ -32,7 +32,7 @@ int main (void)
assert (router); assert (router);
int enabled = 1; int enabled = 1;
int rc = zmq_setsockopt (req, ZMQ_REQ_REQUEST_IDS, &enabled, sizeof (int)); int rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
assert (rc == 0); assert (rc == 0);
int rcvtimeo = 100; int rcvtimeo = 100;

View File

@ -28,12 +28,11 @@ int main (void)
void *req = zmq_socket (ctx, ZMQ_REQ); void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req); assert (req);
int disabled = 0; int enabled = 1;
int rc = zmq_setsockopt (req, ZMQ_REQ_STRICT, &disabled, sizeof (int)); int rc = zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int));
assert (rc == 0); assert (rc == 0);
int enabled = 1; rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
rc = zmq_setsockopt (req, ZMQ_REQ_REQUEST_IDS, &enabled, sizeof (int));
assert (rc == 0); assert (rc == 0);
rc = zmq_bind (req, "tcp://127.0.0.1:5555"); rc = zmq_bind (req, "tcp://127.0.0.1:5555");
@ -55,7 +54,7 @@ int main (void)
// We have to give the connects time to finish otherwise the requests // We have to give the connects time to finish otherwise the requests
// will not properly round-robin. We could alternatively connect the // will not properly round-robin. We could alternatively connect the
// REQ sockets to the REP sockets. // REQ sockets to the REP sockets.
zmq_sleep(1); msleep (SETTLE_TIME);
// Case 1: Second send() before a reply arrives in a pipe. // Case 1: Second send() before a reply arrives in a pipe.

View File

@ -0,0 +1,65 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int main (void) {
setup_test_environment();
void *ctx = zmq_ctx_new();
assert(ctx);
void *router = zmq_socket(ctx, ZMQ_ROUTER);
assert(router);
void *dealer = zmq_socket(ctx, ZMQ_DEALER);
assert(dealer);
int one=1;
int rc = zmq_setsockopt(router, ZMQ_ROUTER_RAW, &one, sizeof(int));
assert(rc >= 0);
rc = zmq_setsockopt(router, ZMQ_ROUTER_MANDATORY, &one, sizeof(int));
assert(rc >= 0);
rc = zmq_bind(router, "tcp://127.0.0.1:5555");
rc = zmq_connect(dealer, "tcp://127.0.0.1:5555");
zmq_send(dealer, "", 0, 0);
zmq_msg_t ident, empty;
zmq_msg_init(&ident);
rc = zmq_msg_recv(&ident, router, 0);
assert(rc >= 0);
rc = zmq_msg_init_data(&empty, (void*)"", 0, NULL, NULL);
assert(rc >= 0);
rc = zmq_msg_send(&ident, router, ZMQ_SNDMORE);
assert(rc >= 0);
rc = zmq_msg_close(&ident);
assert(rc >= 0);
rc = zmq_msg_send(&empty, router, 0);
assert(rc >= 0);
// This close used to fail with Bad Address
rc = zmq_msg_close(&empty);
assert(rc >= 0);
close_zero_linger(dealer);
close_zero_linger(router);
zmq_ctx_term(ctx);
}

View File

@ -19,11 +19,11 @@
#include "testutil.hpp" #include "testutil.hpp"
// Test keys from the zmq_curve man page // We'll generate random test keys at startup
static char client_public [] = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID"; static char client_public [41];
static char client_secret [] = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs"; static char client_secret [41];
static char server_public [] = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7"; static char server_public [41];
static char server_secret [] = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6"; static char server_secret [41];
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// Encode a binary frame as a string; destination string MUST be at least // Encode a binary frame as a string; destination string MUST be at least
@ -86,6 +86,13 @@ int main (void)
printf ("libsodium not installed, skipping CURVE test\n"); printf ("libsodium not installed, skipping CURVE test\n");
return 0; return 0;
#endif #endif
// Generate new keypairs for this test
int rc = zmq_curve_keypair (client_public, client_secret);
assert (rc == 0);
rc = zmq_curve_keypair (server_public, server_secret);
assert (rc == 0);
setup_test_environment (); setup_test_environment ();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
@ -95,7 +102,7 @@ int main (void)
// where child thread does not start up fast enough. // where child thread does not start up fast enough.
void *handler = zmq_socket (ctx, ZMQ_REP); void *handler = zmq_socket (ctx, ZMQ_REP);
assert (handler); assert (handler);
int rc = zmq_bind (handler, "inproc://zeromq.zap.01"); rc = zmq_bind (handler, "inproc://zeromq.zap.01");
assert (rc == 0); assert (rc == 0);
void *zap_thread = zmq_threadstart (&zap_handler, handler); void *zap_thread = zmq_threadstart (&zap_handler, handler);
@ -175,8 +182,9 @@ int main (void)
// Check CURVE security with bogus client credentials // Check CURVE security with bogus client credentials
// This must be caught by the ZAP handler // This must be caught by the ZAP handler
char bogus_public [] = "8)<]6{NT{}=MZBsH)i%l0k}y*^i#80n-Yf{I8Z+P"; char bogus_public [41];
char bogus_secret [] = "[m9E0TW2Mf?Ke3K>fuBGCrkBpc6aJbj4jv4451Nx"; char bogus_secret [41];
zmq_curve_keypair (bogus_public, bogus_secret);
client = zmq_socket (ctx, ZMQ_DEALER); client = zmq_socket (ctx, ZMQ_DEALER);
assert (client); assert (client);

View File

@ -57,6 +57,8 @@ void test_fair_queue_in (void *ctx)
s_send_seq (rep, "A", SEQ_END); s_send_seq (rep, "A", SEQ_END);
s_recv_seq (reqs [0], "A", SEQ_END); s_recv_seq (reqs [0], "A", SEQ_END);
// TODO: following test fails randomly on some boxes
#ifdef SOMEONE_FIXES_THIS
// send N requests // send N requests
for (size_t peer = 0; peer < services; ++peer) { for (size_t peer = 0; peer < services; ++peer) {
char * str = strdup("A"); char * str = strdup("A");
@ -69,12 +71,13 @@ void test_fair_queue_in (void *ctx)
for (size_t peer = 0; peer < services; ++peer) { for (size_t peer = 0; peer < services; ++peer) {
char * str = strdup("A"); char * str = strdup("A");
str [0] += peer; str [0] += peer;
// Test fails here
s_recv_seq (rep, str, SEQ_END); s_recv_seq (rep, str, SEQ_END);
s_send_seq (rep, str, SEQ_END); s_send_seq (rep, str, SEQ_END);
s_recv_seq (reqs [peer], str, SEQ_END); s_recv_seq (reqs [peer], str, SEQ_END);
free (str); free (str);
} }
#endif
close_zero_linger (rep); close_zero_linger (rep);
for (size_t peer = 0; peer < services; ++peer) for (size_t peer = 0; peer < services; ++peer)

View File

@ -46,7 +46,7 @@ void test_round_robin_out (void *ctx)
// We have to give the connects time to finish otherwise the requests // We have to give the connects time to finish otherwise the requests
// will not properly round-robin. We could alternatively connect the // will not properly round-robin. We could alternatively connect the
// REQ sockets to the REP sockets. // REQ sockets to the REP sockets.
zmq_sleep(1); msleep (SETTLE_TIME);
// Send our peer-replies, and expect every REP it used once in order // Send our peer-replies, and expect every REP it used once in order
for (size_t peer = 0; peer < services; peer++) { for (size_t peer = 0; peer < services; peer++) {

View File

@ -89,20 +89,18 @@ test_stream_to_dealer (void)
assert (rc == sizeof (greeting)); assert (rc == sizeof (greeting));
// Now we expect the data from the DEALER socket // Now we expect the data from the DEALER socket
// First frame is, again, the identity of the connection // We want the rest of greeting along with the Ready command
int bytes_read = 0;
while (bytes_read < 97) {
// First frame is the identity of the connection (each time)
rc = zmq_msg_recv (&identity, stream, 0); rc = zmq_msg_recv (&identity, stream, 0);
assert (rc > 0); assert (rc > 0);
assert (zmq_msg_more (&identity)); assert (zmq_msg_more (&identity));
// Second frame contains the next chunk of data
// Second frame contains the rest of greeting along with
// the Ready command
int bytes_read = 0;
while (bytes_read < 97) {
rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0); rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0);
assert (rc >= 0); assert (rc >= 0);
bytes_read += rc; bytes_read += rc;
} }
assert (rc == 97);
// First two bytes are major and minor version numbers. // First two bytes are major and minor version numbers.
assert (buffer [0] == 3); // ZMTP/3.0 assert (buffer [0] == 3); // ZMTP/3.0
@ -173,12 +171,12 @@ test_stream_to_stream (void)
void *server = zmq_socket (ctx, ZMQ_STREAM); void *server = zmq_socket (ctx, ZMQ_STREAM);
assert (server); assert (server);
rc = zmq_bind (server, "tcp://127.0.0.1:8080"); rc = zmq_bind (server, "tcp://127.0.0.1:9080");
assert (rc == 0); assert (rc == 0);
void *client = zmq_socket (ctx, ZMQ_STREAM); void *client = zmq_socket (ctx, ZMQ_STREAM);
assert (client); assert (client);
rc = zmq_connect (client, "tcp://localhost:8080"); rc = zmq_connect (client, "tcp://localhost:9080");
assert (rc == 0); assert (rc == 0);
// It would be less surprising to get an empty message instead // It would be less surprising to get an empty message instead
// of having to fetch the identity like this [PH 2013/06/27] // of having to fetch the identity like this [PH 2013/06/27]

View File

@ -59,7 +59,7 @@ int main (void)
assert (rc >= 0); assert (rc >= 0);
// Wait a bit till the subscription gets to the publisher // Wait a bit till the subscription gets to the publisher
zmq_sleep(1); msleep (SETTLE_TIME);
// Send an empty message // Send an empty message
rc = zmq_send (pub, NULL, 0, 0); rc = zmq_send (pub, NULL, 0, 0);

View File

@ -20,7 +20,7 @@
#include "testutil.hpp" #include "testutil.hpp"
#if defined (ZMQ_HAVE_WINDOWS) #if defined (ZMQ_HAVE_WINDOWS)
# include <WinSock2.h> # include <winsock2.h>
# include <stdexcept> # include <stdexcept>
#else #else
# include <sys/socket.h> # include <sys/socket.h>

View File

@ -49,7 +49,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Allow unbind to settle // Allow unbind to settle
zmq_sleep(1); msleep (SETTLE_TIME);
// Check that sending would block (there's no outbound connection) // Check that sending would block (there's no outbound connection)
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
@ -86,7 +86,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Allow disconnect to settle // Allow disconnect to settle
zmq_sleep(1); msleep (SETTLE_TIME);
// Check that sending would block (there's no inbound connections). // Check that sending would block (there's no inbound connections).
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);

View File

@ -24,6 +24,11 @@
#include "../include/zmq_utils.h" #include "../include/zmq_utils.h"
#include "platform.hpp" #include "platform.hpp"
// This defines the settle time used in tests; raise this if we
// get test failures on slower systems due to binds/connects not
// settled. Tested to work reliably at 1 msec on a fast PC.
#define SETTLE_TIME 10 // In msec
#undef NDEBUG #undef NDEBUG
#include <time.h> #include <time.h>
#include <assert.h> #include <assert.h>
@ -31,8 +36,10 @@
#include <string> #include <string>
#if defined _WIN32 #if defined _WIN32
# if defined _MSC_VER
# include <crtdbg.h> # include <crtdbg.h>
# pragma warning(disable:4996) # pragma warning(disable:4996)
# endif
#else #else
# include <unistd.h> # include <unistd.h>
# include <signal.h> # include <signal.h>
@ -249,10 +256,24 @@ void close_zero_linger (void *socket)
void setup_test_environment() void setup_test_environment()
{ {
#if defined _WIN32 #if defined _WIN32
# if defined _MSC_VER
_set_abort_behavior( 0, _WRITE_ABORT_MSG); _set_abort_behavior( 0, _WRITE_ABORT_MSG);
_CrtSetReportMode( _CRT_ASSERT, _CRTDBG_MODE_FILE ); _CrtSetReportMode( _CRT_ASSERT, _CRTDBG_MODE_FILE );
_CrtSetReportFile( _CRT_ASSERT, _CRTDBG_FILE_STDERR ); _CrtSetReportFile( _CRT_ASSERT, _CRTDBG_FILE_STDERR );
# endif # endif
#endif
} }
// Provide portable millisecond sleep
// http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for
void msleep (int milliseconds)
{
#ifdef ZMQ_HAVE_WINDOWS
Sleep (milliseconds);
#else
usleep (static_cast <useconds_t> (milliseconds) * 1000);
#endif
}
#endif #endif

View File

@ -1,9 +1,9 @@
EXTRA_DIST = curve_keygen.c z85_codec.h EXTRA_DIST = curve_keygen.c
INCLUDES = -I$(top_srcdir)/include INCLUDES = -I$(top_srcdir)/include
bin_PROGRAMS = curve_keygen bin_PROGRAMS = curve_keygen
curve_keygen_LDADD = $(top_srcdir)/src/libzmq.la curve_keygen_LDADD = $(top_builddir)/src/libzmq.la
curve_keygen_SOURCES = curve_keygen.c curve_keygen_SOURCES = curve_keygen.c

View File

@ -24,19 +24,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <assert.h>
#include <platform.hpp>
#include <zmq.h>
#include <zmq_utils.h> #include <zmq_utils.h>
#ifdef HAVE_LIBSODIUM
# include <sodium.h>
#endif
int main (void) int main (void)
{ {
#ifdef HAVE_LIBSODIUM
# if crypto_box_PUBLICKEYBYTES != 32 \
|| crypto_box_SECRETKEYBYTES != 32
# error "libsodium not built correctly"
# endif
puts ("This tool generates a CurveZMQ keypair, as two printable strings you can"); puts ("This tool generates a CurveZMQ keypair, as two printable strings you can");
puts ("use in configuration files or source code. The encoding uses Z85, which"); puts ("use in configuration files or source code. The encoding uses Z85, which");
puts ("is a base-85 format that is described in 0MQ RFC 32, and which has an"); puts ("is a base-85 format that is described in 0MQ RFC 32, and which has an");
@ -44,23 +38,21 @@ int main (void)
puts ("always works with the secret key held by one party and the public key"); puts ("always works with the secret key held by one party and the public key");
puts ("distributed (securely!) to peers wishing to connect to it."); puts ("distributed (securely!) to peers wishing to connect to it.");
uint8_t public_key [32]; char public_key [41];
uint8_t secret_key [32]; char secret_key [41];
int rc = zmq_curve_keypair (public_key, secret_key);
if (rc != 0) {
if (zmq_errno () == ENOTSUP) {
puts ("To use curve_keygen, please install libsodium and then rebuild libzmq.");
}
exit (1);
}
int rc = crypto_box_keypair (public_key, secret_key);
assert (rc == 0);
char encoded [41];
zmq_z85_encode (encoded, public_key, 32);
puts ("\n== CURVE PUBLIC KEY =="); puts ("\n== CURVE PUBLIC KEY ==");
puts (encoded); puts (public_key);
zmq_z85_encode (encoded, secret_key, 32);
puts ("\n== CURVE SECRET KEY =="); puts ("\n== CURVE SECRET KEY ==");
puts (encoded); puts (secret_key);
#else
puts ("To build curve_keygen, please install libsodium and then rebuild libzmq.");
#endif
exit (0); exit (0);
} }