This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
I believe there was a conception that zmq_connect() and zmq_bind() will be called
only at the socket creation time and therefore don't need it.
Now it is not true anymore.
1. when we call zmq_bind()/zmq_connect() to create endpoint
we send ourselfs(through launch_child()) command to process_own(endpoint)
(and add it to own_t::owned)
in the application thread we could call zmq_unbind() / zmq_disconnect() _BEFORE_
we run process_own() in ZMQ thread and in this situation we will be unable to find it in
own_t::owned. in other words own_t::owned.find(endpoint) will not be deleted but it will be deleted from
socket_base_t::endpoints.
2. when you zmq_unbind() the lisnening TCP/IPC socket was terminated only in destructor...
so the whole ZMQ_LINGER time listening TCP/IPC socket was able to accept() new connections
but unable to handle them.
this all geting even worse since unfortunately zmq has a bug and '*_listener_t' object not terminated
untill the socket's zmq_close().
AT LEAST FOR PUSH SOCKETS.
Everything is ok for SUB sockets.
Easy to reproduce without my fix:
zmq_socket(PUSH)
zmq_bind(tcp);
// connect to it from PULL socket
zmq_unbind(tcp);
sleep(forever)
// netstat -anp | grep 'tcp listening socket'
With my fix you could see that after zmq_unbind(tcp) all previously connected tcp sessions
will not be finished untill the zmq_close(socket) regardless of ZMQ_LINGER value.
(*_listener_t terminates all owned session_base_t(connect=false) and they call pipe_t::terminate()
which in turn should call session_base_t::terminated() but this never happens)
socket. Thus, it is shared between subsequent calls
to xs_recv (and xs_send). That in turn significantly
limits the number of invocations of getimeofday (or similar)
when timeouts are used and recv/send is called in a
tight loop.
The scoket implementation for inproc transfer failed to flush
identity message. The result was that the identity message
was not delivered until after the user sent the first message.
The identity message was never delivered if the user
used the socket only to receive messages.
Previously, sockets were still "valid" after being closed and only marked
as invalid when destroyed. This meant programs could access closed sockets.
Now the socket is marked "invalid" when closed.
* Implemented new ctx API (_new, _destroy, _get, _set)
* Removed 'typesafe' macros from zmq.h
* Added support for MAX_SOCKETS (was tied into change for #337)
* Created new man pages
* Added two new files: errno.hpp and errno.cpp. They are required to use errno functionality on WM.
* zmq.cpp, msg.h: removed inclusion of errno.h because it is included in zmq.h that is also included by .cpp.
* windows.hpp: process.h is included only for desktop builds.
* thread.cpp: on CE CreateThread is used instead of __beginthreadex
* socket_base.cpp, clock.cpp: on CE include cmnintrin.h instead on intrin.h
* signaler.cpp: on Windows should use special macro around event name (for unicode builds)
* err.hpp: make it include errno.hpp (my file) instead on errno.h when building for CE
* err.cpp: use FormatMessage when building for CE (because CE does not have ANSI API functions)
* zmq.h: do not include errno.h whe building for CE
* libzmq.vcproj: add tro new files
This allows us to actually report an error to the caller on resolve
failure, rather than asserting later on in the io thread.
Signed-off-by: Staffan Gimåker <staffan@spotify.com>
We use a distinct context initialisation function to specify
all sockets derived therefrom will be thread safe.
However the inheritance is done exclusively in the C interface.
This is not really correct, but it is chosen to minimise
interference with the existing C++ code, including any
construct or other calls within the C++ code base.
Semantically the C++ code should be unchanged,
physically some data structures and extra methods are
provided by they're only used from the C binding.
1. Reorganise C API socket functions to eliminate bad practice
of public functions calling other public functions. This should
be done for msg's too but hasn't been in this patch.
2. Reorganise code in C API socket functions so that the
socket is cast on one line, the C++ function called on
the next with the result retained, then the result is returned.
This makes the code much simpler to read and also allows
pre- and post- call hooks to be inserted easily.
3. Insert pre- and post- call hooks which set and release
a mutex iff the thread_safe flag is on.
4. Add the thread_safe_flag to base_socket_t initialised to
false to preserve existing semantics. Add an accessor for
the flag, add a mutex, and add lock and unlock functions.
Note: as yet no code to actually set the flag.
With the introduction of subscription forwarding, the first message sent
on a PUB socket using a unidirectional transport (e.g. PGM) is always
lost due to the "subscribe to all" being done asynchronously.
This patch fixes the problem and also refactors the code to have a single
point where the "subscribe to all" is performed.
Signed-off-by: Martin Lucina <martin@lucina.net>
The new function allows to retrieve options (flags)
from zmq_msg_t.
Signed-off-by: Chuck Remes <cremes@mac.com>
Renamed from zmq_msg_flags to zmq_getmsgopt
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Previous patches have missed the case when the identity should
be sent from an inproc endpoint. Fixed.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
However, the "durable socket" behaviour wasn't re-added.
Identities are used solely for routing in REQ/REP pattern.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This is a preliminary patch allowing for socket-type-specific
functionality in the I/O thread. For example, message format
can be checked asynchronously and misbehaved connections dropped
straight away.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
zmq_engine and tcp_socket merged into tcp_engine
zmq_connecter and tcp_connecter merged into tcp_connecter
zmq_listener and tcp_listener merged into tcp_listener
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Removal of ZMQ_IDENTITY resulted in various session classes doing
almost the same thing. This patch merges the classes into a single
class.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
GENERIC allows to use 0MQ as a dumb networking framework.
It provides user with connect/disconnect notifications.
Also, each inbound message is labeled by ID of the connection
it originated from. Outbound messages should be labeled by
the ID of the connection to send them to.
To distinguish connect/disconnect notifications from common
messages, COMMAND flag was introduced.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
So far the requests in req/rep pattern were delivered to and processed
by worker even though the original requester was dead. Thus,
the worker processing replies with noone to deliver results to.
This optimisation drops requests in two situations:
1. Queued inbound requests in XREP socket when peer disconnects.
2. Queued outbound requests in XREQ when socket is closed.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This patch introduces two changes:
1. 32-bit ID is used to identify the peer instead of UUID
2. REQ socket seeds the label stack with unique 32-bit request ID
It also drops any replies with non-matching request ID
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
So far there was no distinction between message parts used by 0MQ
and message parts used by user. Now, the message parts used by 0MQ
are marked as 'LABEL'.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Till now the code was spread over mutliple locations.
Additionally, the code was made more formally correct,
with explicit pipe state machine etc.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
So far, the pipe termination code was spread among socket type
classes, fair queuer, load balancer, etc. This patch moves
all the associated logic to a single place.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
So far, there was a pair of unidirectional pipes between a socket
and a session (or an inproc peer). This resulted in complex
problems with half-closed states and tracking which inpipe
corresponds to which outpipe.
This patch doesn't add any functionality in itself, but is
essential for further work on features like subscription
forwarding.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
The string format of UUID is not used in 0MQ. Further on,
it turns out that UUIDs have fixed microarchitecture-agnostic
binary layout (see RFC4122). Thus, the conversion to string
and back to binary can be avoided.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This patch addresses serveral issues:
1. It gathers message related functionality scattered over whole
codebase into a single class.
2. It makes zmq_msg_t an opaque datatype. Internals of the class
don't pollute zmq.h header file.
3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33%
of memory in scenarios with large amount of small messages.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
These new options allow to control the maximum size of the
inbound and outbound message pipe separately.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
On-disk storage should be implemented in devices rather than
in 0MQ core. 0MQ is a networking library and there's no point
in storing network buffers on disk.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Reaper thread destroys the socket asynchronously.
zmq_term() can be interrupted by a signal (EINTR).
zmq_socket() will return ETERM after zmq_term() was called.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
The meat of the patch was contributed by Douglas Creager.
Martin Sustrik implemented storing peer options in inproc
endpoint repository.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Added block boolean var to second process_commands() invocation for blocking sockets
instead of always using true. This prevents the process_commands() call from hanging
when a message is received with an empty queue after the call to xrecv() but
prior to the initial call to process_commands() invoked when ++ticks == inbound_poll_rate.
Signed-off-by: Marc Rossi <mrossi19@gmail.com>
For historical reasons queue to transfer commands between
threads was called 'signaler'. Given that it was used to
pass commands rather than signals it was renamed to 'mailbox',
see Erlang mailboxes.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Threads were so far identified by integers called 'slots'.
This patch renames them to more comprehensible 'tid's (thread IDs).
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
1. ZMQ_LINGER option can be set/get
2. options are part of own_t base class rather than being declared
separately by individual objects
3. Linger option is propagated with "term" command so that the
newest value of it is used rather than the stored old one.
4. Session sets the linger timer if needed and terminates
as soon as it expires.
5. Corresponding documentation updated.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Sockets may now be migrated between OS threads; sockets may not be used by
more than one thread at any time. To migrate a socket to another thread the
caller must ensure that a full memory barrier is called before using the
socket from the target thread.
The new zmq_close() semantics implement the behaviour discussed at:
http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html
Specifically, zmq_close() is now deterministic and while it still returns
immediately, it does not discard any data that may still be queued for
sending. Further, zmq_term() will now block until all outstanding data has
been sent.
TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or
an equivalent mechanism (possibly a configurable timeout to zmq_term())
needs to be implemented.
This commit introduces the necessary changes necessary
for implementing flow control. None of the socket types
implements the flow control yet. The code will crash when
the flow control is enabled and the thw lwm is reached.
The following commits will add flow-control support for
individual socket types.
C and C++ headers moved from bindings/ to include/, bindings/ removed
--with-c and --with-cpp options to configure removed, C and C++ now built
and installed by default