mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-12 10:33:52 +01:00
* Implemented new ctx API (_new, _destroy, _get, _set) * Removed 'typesafe' macros from zmq.h * Added support for MAX_SOCKETS (was tied into change for #337) * Created new man pages
This commit is contained in:
parent
bdefa181ed
commit
6e71a54b1e
@ -1,9 +1,11 @@
|
||||
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 zmq_init.3 \
|
||||
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 \
|
||||
zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \
|
||||
zmq_init.3 zmq_term.3 \
|
||||
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
|
||||
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
|
||||
zmq_msg_send.3 zmq_msg_recv.3 \
|
||||
zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \
|
||||
zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \
|
||||
zmq_strerror.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \
|
||||
zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3
|
||||
MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7
|
||||
|
||||
|
18
doc/zmq.txt
18
doc/zmq.txt
@ -30,9 +30,21 @@ provided by the 0MQ library.
|
||||
|
||||
Context
|
||||
~~~~~~~
|
||||
Before using any 0MQ library functions the caller must initialise a 0MQ
|
||||
'context' using _zmq_init()_. The following functions are provided to handle
|
||||
initialisation and termination of a 'context':
|
||||
Before using any 0MQ library functions you must create a 0MQ 'context'. When
|
||||
you exit your application you must destroy the 'context'. These functions let
|
||||
you work with 'contexts':
|
||||
|
||||
Create a new 0MQ context::
|
||||
linkzmq:zmq_ctx_new[3]
|
||||
|
||||
Work with context properties::
|
||||
linkzmq:zmq_ctx_set[3]
|
||||
linkzmq:zmq_ctx_get[3]
|
||||
|
||||
Destroy a 0MQ context::
|
||||
linkzmq:zmq_ctx_destroy[3]
|
||||
|
||||
These deprecated functions let you create and destroy 'contexts':
|
||||
|
||||
Initialise 0MQ context::
|
||||
linkzmq:zmq_init[3]
|
||||
|
66
doc/zmq_ctx_destroy.txt
Normal file
66
doc/zmq_ctx_destroy.txt
Normal file
@ -0,0 +1,66 @@
|
||||
zmq_ctx_destroy(3)
|
||||
==================
|
||||
|
||||
|
||||
NAME
|
||||
----
|
||||
zmq_ctx_destroy - destroy a 0MQ context
|
||||
|
||||
|
||||
SYNOPSIS
|
||||
--------
|
||||
*int zmq_ctx_destroy (void '*context');*
|
||||
|
||||
|
||||
DESCRIPTION
|
||||
-----------
|
||||
The _zmq_ctx_destroy()_ function shall destroy the 0MQ context 'context'.
|
||||
|
||||
Context termination is performed in the following steps:
|
||||
|
||||
1. Any blocking operations currently in progress on sockets open within
|
||||
'context' shall return immediately with an error code of ETERM. With the
|
||||
exception of _zmq_close()_, any further operations on sockets open within
|
||||
'context' shall fail with an error code of ETERM.
|
||||
|
||||
2. After interrupting all blocking calls, _zmq_ctx_destroy()_ shall _block_ until the
|
||||
following conditions are satisfied:
|
||||
|
||||
* All sockets open within 'context' have been closed with _zmq_close()_.
|
||||
|
||||
* For each socket within 'context', all messages sent by the application
|
||||
with _zmq_send()_ have either been physically transferred to a network
|
||||
peer, or the socket's linger period set with the _ZMQ_LINGER_ socket
|
||||
option has expired.
|
||||
|
||||
For further details regarding socket linger behavior refer to the _ZMQ_LINGER_
|
||||
option in linkzmq:zmq_setsockopt[3].
|
||||
|
||||
This function replaces the deprecated function linkzmq:zmq_term[3].
|
||||
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
The _zmq_ctx_destroy()_ function shall return zero if successful. Otherwise
|
||||
it shall return `-1` and set 'errno' to one of the values defined below.
|
||||
|
||||
|
||||
ERRORS
|
||||
------
|
||||
*EFAULT*::
|
||||
The provided 'context' was invalid.
|
||||
*EINTR*::
|
||||
Termination was interrupted by a signal. It can be restarted if needed.
|
||||
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
linkzmq:zmq[7]
|
||||
linkzmq:zmq_init[3]
|
||||
linkzmq:zmq_close[3]
|
||||
linkzmq:zmq_setsockopt[3]
|
||||
|
||||
|
||||
AUTHORS
|
||||
-------
|
||||
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
|
67
doc/zmq_ctx_get.txt
Normal file
67
doc/zmq_ctx_get.txt
Normal file
@ -0,0 +1,67 @@
|
||||
zmq_ctx_get(3)
|
||||
==============
|
||||
|
||||
|
||||
NAME
|
||||
----
|
||||
|
||||
zmq_ctx_get - get context options
|
||||
|
||||
|
||||
SYNOPSIS
|
||||
--------
|
||||
*int zmq_ctx_get (void '*context', int 'option_name');*
|
||||
|
||||
|
||||
DESCRIPTION
|
||||
-----------
|
||||
The _zmq_ctx_get()_ function shall return the option specified by the
|
||||
'option_name' argument.
|
||||
|
||||
The _zmq_ctx_get()_ function accepts the following option names:
|
||||
|
||||
|
||||
ZMQ_IO_THREADS: Get number of I/O threads
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_IO_THREADS' argument returns the size of the 0MQ thread pool
|
||||
for this context.
|
||||
|
||||
ZMQ_MAX_SOCKETS: Set maximum number of sockets
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_MAX_SOCKETS' argument returns the maximum number of sockets
|
||||
allowed for this context.
|
||||
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
The _zmq_ctx_get()_ function returns a value of 0 or greater if successful.
|
||||
Otherwise it returns `-1` and sets 'errno' to one of the values defined
|
||||
below.
|
||||
|
||||
|
||||
ERRORS
|
||||
------
|
||||
*EINVAL*::
|
||||
The requested option _option_name_ is unknown.
|
||||
|
||||
|
||||
EXAMPLE
|
||||
-------
|
||||
.Setting a limit on the number of sockets
|
||||
----
|
||||
void *context = zmq_ctx_new ();
|
||||
zmq_ctx_get (context, ZMQ_MAX_SOCKETS, 256);
|
||||
int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS);
|
||||
assert (max_sockets == 256);
|
||||
----
|
||||
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
linkzmq:zmq_ctx_set[3]
|
||||
linkzmq:zmq[7]
|
||||
|
||||
|
||||
AUTHORS
|
||||
-------
|
||||
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
|
49
doc/zmq_ctx_new.txt
Normal file
49
doc/zmq_ctx_new.txt
Normal file
@ -0,0 +1,49 @@
|
||||
zmq_ctx_new(3)
|
||||
==============
|
||||
|
||||
|
||||
NAME
|
||||
----
|
||||
zmq_ctx_new - create new 0MQ context
|
||||
|
||||
|
||||
SYNOPSIS
|
||||
--------
|
||||
*void *zmq_ctx_new ();*
|
||||
|
||||
|
||||
DESCRIPTION
|
||||
-----------
|
||||
The _zmq_ctx_new()_ function creates a new 0MQ 'context'.
|
||||
|
||||
This function replaces the deprecated function linkzmq:zmq_init[3].
|
||||
|
||||
.Thread safety
|
||||
A 0MQ 'context' is thread safe and may be shared among as many application
|
||||
threads as necessary, without any additional locking required on the part of
|
||||
the caller.
|
||||
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
The _zmq_ctx_new()_ function shall return an opaque handle to the newly created
|
||||
'context' if successful. Otherwise it shall return NULL and set 'errno' to one
|
||||
of the values defined below.
|
||||
|
||||
|
||||
ERRORS
|
||||
------
|
||||
No error values are defined for this function.
|
||||
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
linkzmq:zmq[7]
|
||||
linkzmq:zmq_ctx_put[3]
|
||||
linkzmq:zmq_ctx_get[3]
|
||||
linkzmq:zmq_ctx_destroy[3]
|
||||
|
||||
|
||||
AUTHORS
|
||||
-------
|
||||
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
|
75
doc/zmq_ctx_set.txt
Normal file
75
doc/zmq_ctx_set.txt
Normal file
@ -0,0 +1,75 @@
|
||||
zmq_ctx_set(3)
|
||||
==============
|
||||
|
||||
|
||||
NAME
|
||||
----
|
||||
|
||||
zmq_ctx_set - set context options
|
||||
|
||||
|
||||
SYNOPSIS
|
||||
--------
|
||||
*int zmq_ctx_set (void '*context', int 'option_name', int 'option_value');*
|
||||
|
||||
|
||||
DESCRIPTION
|
||||
-----------
|
||||
The _zmq_ctx_set()_ function shall set the option specified by the
|
||||
'option_name' argument to the value of the 'option_value' argument.
|
||||
|
||||
The _zmq_ctx_set()_ function accepts the following options:
|
||||
|
||||
|
||||
ZMQ_IO_THREADS: Set number of I/O threads
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_IO_THREADS' argument specifies the size of the 0MQ thread pool to
|
||||
handle I/O operations. If your application is using only the 'inproc'
|
||||
transport for messaging you may set this to zero, otherwise set it to at
|
||||
least one. This option only applies before creating any sockets on the
|
||||
context.
|
||||
|
||||
[horizontal]
|
||||
Default value:: 1
|
||||
|
||||
ZMQ_MAX_SOCKETS: Set maximum number of sockets
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed
|
||||
on the context.
|
||||
|
||||
[horizontal]
|
||||
Default value:: 1024
|
||||
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
The _zmq_ctx_set()_ function returns zero if successful. Otherwise it
|
||||
returns `-1` and sets 'errno' to one of the values defined below.
|
||||
|
||||
|
||||
ERRORS
|
||||
------
|
||||
*EINVAL*::
|
||||
The requested option _option_name_ is unknown.
|
||||
|
||||
|
||||
EXAMPLE
|
||||
-------
|
||||
.Setting a limit on the number of sockets
|
||||
----
|
||||
void *context = zmq_ctx_new ();
|
||||
zmq_ctx_set (context, ZMQ_MAX_SOCKETS, 256);
|
||||
int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS);
|
||||
assert (max_sockets == 256);
|
||||
----
|
||||
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
linkzmq:zmq_ctx_get[3]
|
||||
linkzmq:zmq[7]
|
||||
|
||||
|
||||
AUTHORS
|
||||
-------
|
||||
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
|
@ -25,6 +25,7 @@ A 0MQ 'context' is thread safe and may be shared among as many application
|
||||
threads as necessary, without any additional locking required on the part of
|
||||
the caller.
|
||||
|
||||
This function is deprecated by linkzmq:zmq_ctx_new[3].
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
|
@ -36,6 +36,7 @@ Context termination is performed in the following steps:
|
||||
For further details regarding socket linger behaviour refer to the _ZMQ_LINGER_
|
||||
option in linkzmq:zmq_setsockopt[3].
|
||||
|
||||
This function is deprecated by linkzmq:zmq_ctx_destroy[3].
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
|
@ -67,33 +67,6 @@ extern "C" {
|
||||
#define ZMQ_VERSION \
|
||||
ZMQ_MAKE_VERSION(ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH)
|
||||
|
||||
/* ensure one of ZMQ_TYPE_SAFE/UNSAFE is defined */
|
||||
/* Choose default based on version */
|
||||
|
||||
/* Uncomment to test */
|
||||
/* #define ZMQ_EMULATE_TYPE_SAFE */
|
||||
|
||||
#if !defined(ZMQ_TYPE_SAFE) && !defined(ZMQ_TYPE_UNSAFE)
|
||||
# if ZMQ_VERSION_MAJOR <= 3
|
||||
# if defined ZMQ_EMULATE_TYPE_SAFE
|
||||
# else
|
||||
# define ZMQ_TYPE_UNSAFE
|
||||
# endif
|
||||
# else
|
||||
# define ZMQ_TYPE_SAFE
|
||||
# endif
|
||||
#elif defined(ZMQ_TYPE_SAFE) && defined(ZMQ_TYPE_UNSAFE)
|
||||
# error "BOTH ZMQ_TYPE_SAFE and ZMQ_TYPE_UNSAFE are defined!"
|
||||
#endif
|
||||
|
||||
#ifdef ZMQ_TYPE_UNSAFE
|
||||
typedef void *zmq_socket_t;
|
||||
typedef void *zmq_ctx_t;
|
||||
#else
|
||||
typedef struct zmq_socket_t { void *data; } zmq_socket_t;
|
||||
typedef struct zmq_ctx_t { void *data; } zmq_ctx_t;
|
||||
#endif
|
||||
|
||||
/* Run-time API version detection */
|
||||
ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
|
||||
|
||||
@ -152,6 +125,29 @@ ZMQ_EXPORT int zmq_errno (void);
|
||||
/* Resolves system errors and 0MQ errors to human-readable string. */
|
||||
ZMQ_EXPORT const char *zmq_strerror (int errnum);
|
||||
|
||||
/******************************************************************************/
|
||||
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
|
||||
/******************************************************************************/
|
||||
|
||||
/* New API */
|
||||
// Context options
|
||||
#define ZMQ_IO_THREADS 1
|
||||
#define ZMQ_MAX_SOCKETS 2
|
||||
|
||||
// Default for new contexts
|
||||
#define ZMQ_IO_THREADS_DFLT 1
|
||||
#define ZMQ_MAX_SOCKETS_DFLT 1024
|
||||
|
||||
ZMQ_EXPORT void *zmq_ctx_new (void);
|
||||
ZMQ_EXPORT int zmq_ctx_destroy (void *context);
|
||||
ZMQ_EXPORT int zmq_ctx_set (void *context, int option, int optval);
|
||||
ZMQ_EXPORT int zmq_ctx_get (void *context, int option);
|
||||
|
||||
/* Old (legacy) API */
|
||||
ZMQ_EXPORT void *zmq_init (int io_threads);
|
||||
ZMQ_EXPORT int zmq_term (void *context);
|
||||
|
||||
|
||||
/******************************************************************************/
|
||||
/* 0MQ message definition. */
|
||||
/******************************************************************************/
|
||||
@ -164,8 +160,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
|
||||
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
|
||||
ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data,
|
||||
size_t size, zmq_free_fn *ffn, void *hint);
|
||||
ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg, zmq_socket_t s, int flags);
|
||||
ZMQ_EXPORT int zmq_msg_recv (zmq_msg_t *msg, zmq_socket_t s, int flags);
|
||||
ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg, void *s, int flags);
|
||||
ZMQ_EXPORT int zmq_msg_recv (zmq_msg_t *msg, void *s, int flags);
|
||||
ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg);
|
||||
ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
|
||||
ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
|
||||
@ -177,13 +173,6 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int option, void *optval,
|
||||
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, const void *optval,
|
||||
size_t *optvallen);
|
||||
|
||||
/******************************************************************************/
|
||||
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
|
||||
/******************************************************************************/
|
||||
|
||||
ZMQ_EXPORT zmq_ctx_t zmq_init (int io_threads);
|
||||
ZMQ_EXPORT int zmq_term (zmq_ctx_t context);
|
||||
|
||||
/******************************************************************************/
|
||||
/* 0MQ socket definition. */
|
||||
/******************************************************************************/
|
||||
@ -239,23 +228,23 @@ ZMQ_EXPORT int zmq_term (zmq_ctx_t context);
|
||||
#define ZMQ_DONTWAIT 1
|
||||
#define ZMQ_SNDMORE 2
|
||||
|
||||
ZMQ_EXPORT zmq_socket_t zmq_socket (zmq_ctx_t context, int type);
|
||||
ZMQ_EXPORT int zmq_close (zmq_socket_t s);
|
||||
ZMQ_EXPORT int zmq_setsockopt (zmq_socket_t s, int option, const void *optval,
|
||||
ZMQ_EXPORT void *zmq_socket (void *, int type);
|
||||
ZMQ_EXPORT int zmq_close (void *s);
|
||||
ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,
|
||||
size_t optvallen);
|
||||
ZMQ_EXPORT int zmq_getsockopt (zmq_socket_t s, int option, void *optval,
|
||||
ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval,
|
||||
size_t *optvallen);
|
||||
ZMQ_EXPORT int zmq_bind (zmq_socket_t s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_connect (zmq_socket_t s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_send (zmq_socket_t s, const void *buf, size_t len, int flags);
|
||||
ZMQ_EXPORT int zmq_recv (zmq_socket_t s, void *buf, size_t len, int flags);
|
||||
ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
|
||||
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
|
||||
|
||||
ZMQ_EXPORT int zmq_sendmsg (zmq_socket_t s, zmq_msg_t *msg, int flags);
|
||||
ZMQ_EXPORT int zmq_recvmsg (zmq_socket_t s, zmq_msg_t *msg, int flags);
|
||||
ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags);
|
||||
ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags);
|
||||
|
||||
/* Experimental */
|
||||
ZMQ_EXPORT int zmq_sendiov (zmq_socket_t s, struct iovec *iov, size_t count, int flags);
|
||||
ZMQ_EXPORT int zmq_recviov (zmq_socket_t s, struct iovec *iov, size_t *count, int flags);
|
||||
ZMQ_EXPORT int zmq_sendiov (void *s, struct iovec *iov, size_t count, int flags);
|
||||
ZMQ_EXPORT int zmq_recviov (void *s, struct iovec *iov, size_t *count, int flags);
|
||||
|
||||
/******************************************************************************/
|
||||
/* I/O multiplexing. */
|
||||
@ -267,7 +256,7 @@ ZMQ_EXPORT int zmq_recviov (zmq_socket_t s, struct iovec *iov, size_t *count, in
|
||||
|
||||
typedef struct
|
||||
{
|
||||
zmq_socket_t socket;
|
||||
void *socket;
|
||||
#if defined _WIN32
|
||||
SOCKET fd;
|
||||
#else
|
||||
@ -287,7 +276,7 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
|
||||
#define ZMQ_FORWARDER 2
|
||||
#define ZMQ_QUEUE 3
|
||||
|
||||
ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket);
|
||||
ZMQ_EXPORT int zmq_device (int device, void *insocket, void* outsocket);
|
||||
|
||||
#undef ZMQ_EXPORT
|
||||
|
||||
|
191
src/ctx.cpp
191
src/ctx.cpp
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2011 iMatix Corporation
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
@ -37,45 +37,21 @@
|
||||
#include "err.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
zmq::ctx_t::ctx_t (uint32_t io_threads_) :
|
||||
tag (0xbadcafe0),
|
||||
terminating (false)
|
||||
zmq::ctx_t::ctx_t () :
|
||||
tag (0xabadcafe),
|
||||
starting (true),
|
||||
terminating (false),
|
||||
reaper (NULL),
|
||||
slot_count (0),
|
||||
slots (NULL),
|
||||
max_sockets (ZMQ_MAX_SOCKETS_DFLT),
|
||||
io_thread_count (ZMQ_IO_THREADS_DFLT)
|
||||
{
|
||||
// Initialise the array of mailboxes. Additional three slots are for
|
||||
// internal log socket and the zmq_term thread the reaper thread.
|
||||
slot_count = max_sockets + io_threads_ + 3;
|
||||
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
|
||||
alloc_assert (slots);
|
||||
|
||||
// Initialise the infrastructure for zmq_term thread.
|
||||
slots [term_tid] = &term_mailbox;
|
||||
|
||||
// Create the reaper thread.
|
||||
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
|
||||
alloc_assert (reaper);
|
||||
slots [reaper_tid] = reaper->get_mailbox ();
|
||||
reaper->start ();
|
||||
|
||||
// Create I/O thread objects and launch them.
|
||||
for (uint32_t i = 2; i != io_threads_ + 2; i++) {
|
||||
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
|
||||
alloc_assert (io_thread);
|
||||
io_threads.push_back (io_thread);
|
||||
slots [i] = io_thread->get_mailbox ();
|
||||
io_thread->start ();
|
||||
}
|
||||
|
||||
// In the unused part of the slot array, create a list of empty slots.
|
||||
for (int32_t i = (int32_t) slot_count - 1;
|
||||
i >= (int32_t) io_threads_ + 2; i--) {
|
||||
empty_slots.push_back (i);
|
||||
slots [i] = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
bool zmq::ctx_t::check_tag ()
|
||||
{
|
||||
return tag == 0xbadcafe0;
|
||||
return tag == 0xabadcafe;
|
||||
}
|
||||
|
||||
zmq::ctx_t::~ctx_t ()
|
||||
@ -93,12 +69,14 @@ zmq::ctx_t::~ctx_t ()
|
||||
delete io_threads [i];
|
||||
|
||||
// Deallocate the reaper thread object.
|
||||
delete reaper;
|
||||
if (reaper)
|
||||
delete reaper;
|
||||
|
||||
// Deallocate the array of mailboxes. No special work is
|
||||
// needed as mailboxes themselves were deallocated with their
|
||||
// corresponding io_thread/socket objects.
|
||||
free (slots);
|
||||
if (slots)
|
||||
free (slots);
|
||||
|
||||
// Remove the tag, so that the object is considered dead.
|
||||
tag = 0xdeadbeef;
|
||||
@ -106,44 +84,125 @@ zmq::ctx_t::~ctx_t ()
|
||||
|
||||
int zmq::ctx_t::terminate ()
|
||||
{
|
||||
// Check whether termination was already underway, but interrupted and now
|
||||
// restarted.
|
||||
slot_sync.lock ();
|
||||
bool restarted = terminating;
|
||||
slot_sync.unlock ();
|
||||
if (!starting) {
|
||||
|
||||
// First attempt to terminate the context.
|
||||
if (!restarted) {
|
||||
// First send stop command to sockets so that any blocking calls can be
|
||||
// interrupted. If there are no sockets we can ask reaper thread to stop.
|
||||
// Check whether termination was already underway, but interrupted and now
|
||||
// restarted.
|
||||
slot_sync.lock ();
|
||||
bool restarted = terminating;
|
||||
terminating = true;
|
||||
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
|
||||
sockets [i]->stop ();
|
||||
if (sockets.empty ())
|
||||
reaper->stop ();
|
||||
slot_sync.unlock ();
|
||||
|
||||
// First attempt to terminate the context.
|
||||
if (!restarted) {
|
||||
|
||||
// First send stop command to sockets so that any blocking calls
|
||||
// can be interrupted. If there are no sockets we can ask reaper
|
||||
// thread to stop.
|
||||
slot_sync.lock ();
|
||||
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
|
||||
sockets [i]->stop ();
|
||||
if (sockets.empty ())
|
||||
reaper->stop ();
|
||||
slot_sync.unlock ();
|
||||
}
|
||||
|
||||
// Wait till reaper thread closes all the sockets.
|
||||
command_t cmd;
|
||||
int rc = term_mailbox.recv (&cmd, -1);
|
||||
if (rc == -1 && errno == EINTR)
|
||||
return -1;
|
||||
zmq_assert (rc == 0);
|
||||
zmq_assert (cmd.type == command_t::done);
|
||||
slot_sync.lock ();
|
||||
zmq_assert (sockets.empty ());
|
||||
slot_sync.unlock ();
|
||||
}
|
||||
|
||||
// Wait till reaper thread closes all the sockets.
|
||||
command_t cmd;
|
||||
int rc = term_mailbox.recv (&cmd, -1);
|
||||
if (rc == -1 && errno == EINTR)
|
||||
return -1;
|
||||
zmq_assert (rc == 0);
|
||||
zmq_assert (cmd.type == command_t::done);
|
||||
slot_sync.lock ();
|
||||
zmq_assert (sockets.empty ());
|
||||
slot_sync.unlock ();
|
||||
|
||||
// Deallocate the resources.
|
||||
delete this;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::ctx_t::set (int option_, int optval_)
|
||||
{
|
||||
int rc = 0;
|
||||
if (option_ == ZMQ_MAX_SOCKETS) {
|
||||
opt_sync.lock ();
|
||||
max_sockets = optval_;
|
||||
opt_sync.unlock ();
|
||||
}
|
||||
else
|
||||
if (option_ == ZMQ_IO_THREADS) {
|
||||
opt_sync.lock ();
|
||||
io_thread_count = optval_;
|
||||
opt_sync.unlock ();
|
||||
}
|
||||
else {
|
||||
errno = EINVAL;
|
||||
rc = -1;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
int zmq::ctx_t::get (int option_)
|
||||
{
|
||||
int rc = 0;
|
||||
if (option_ == ZMQ_MAX_SOCKETS)
|
||||
rc = max_sockets;
|
||||
else
|
||||
if (option_ == ZMQ_IO_THREADS)
|
||||
rc = io_thread_count;
|
||||
else {
|
||||
errno = EINVAL;
|
||||
rc = -1;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
|
||||
{
|
||||
if (unlikely (starting)) {
|
||||
|
||||
starting = false;
|
||||
|
||||
// Initialise the array of mailboxes. Additional three slots are for
|
||||
// zmq_term thread and reaper thread.
|
||||
opt_sync.lock ();
|
||||
int mazmq = max_sockets;
|
||||
int ios = io_thread_count;
|
||||
opt_sync.unlock ();
|
||||
slot_count = mazmq + ios + 2;
|
||||
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
|
||||
alloc_assert (slots);
|
||||
|
||||
// Initialise the infrastructure for zmq_term thread.
|
||||
slots [term_tid] = &term_mailbox;
|
||||
|
||||
// Create the reaper thread.
|
||||
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
|
||||
alloc_assert (reaper);
|
||||
slots [reaper_tid] = reaper->get_mailbox ();
|
||||
reaper->start ();
|
||||
|
||||
// Create I/O thread objects and launch them.
|
||||
for (int i = 2; i != ios + 2; i++) {
|
||||
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
|
||||
alloc_assert (io_thread);
|
||||
io_threads.push_back (io_thread);
|
||||
slots [i] = io_thread->get_mailbox ();
|
||||
io_thread->start ();
|
||||
}
|
||||
|
||||
// In the unused part of the slot array, create a list of empty slots.
|
||||
for (int32_t i = (int32_t) slot_count - 1;
|
||||
i >= (int32_t) ios + 2; i--) {
|
||||
empty_slots.push_back (i);
|
||||
slots [i] = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
slot_sync.lock ();
|
||||
|
||||
// Once zmq_term() was called, we can't create new sockets.
|
||||
@ -164,8 +223,11 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
|
||||
uint32_t slot = empty_slots.back ();
|
||||
empty_slots.pop_back ();
|
||||
|
||||
// Generate new unique socket ID.
|
||||
int sid = ((int) max_socket_id.add (1)) + 1;
|
||||
|
||||
// Create the socket and register its mailbox.
|
||||
socket_base_t *s = socket_base_t::create (type_, this, slot);
|
||||
socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
|
||||
if (!s) {
|
||||
empty_slots.push_back (slot);
|
||||
slot_sync.unlock ();
|
||||
@ -286,3 +348,8 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
|
||||
endpoints_sync.unlock ();
|
||||
return *endpoint;
|
||||
}
|
||||
|
||||
// The last used socket ID, or 0 if no socket was used so far. Note that this
|
||||
// is a global variable. Thus, even sockets created in different contexts have
|
||||
// unique IDs.
|
||||
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;
|
||||
|
29
src/ctx.hpp
29
src/ctx.hpp
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2009 iMatix Corporation
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
@ -33,6 +33,7 @@
|
||||
#include "mutex.hpp"
|
||||
#include "stdint.hpp"
|
||||
#include "options.hpp"
|
||||
#include "atomic_counter.hpp"
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
@ -58,9 +59,8 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
// Create the context object. The argument specifies the size
|
||||
// of I/O thread pool to create.
|
||||
ctx_t (uint32_t io_threads_);
|
||||
// Create the context object
|
||||
ctx_t ();
|
||||
|
||||
// Returns false if object is not a context.
|
||||
bool check_tag ();
|
||||
@ -71,6 +71,10 @@ namespace zmq
|
||||
// after the last one is closed.
|
||||
int terminate ();
|
||||
|
||||
// Set and set context properties
|
||||
int set (int option_, int optval_);
|
||||
int get (int option_);
|
||||
|
||||
// Create and destroy a socket.
|
||||
zmq::socket_base_t *create_socket (int type_);
|
||||
void destroy_socket (zmq::socket_base_t *socket_);
|
||||
@ -113,6 +117,10 @@ namespace zmq
|
||||
typedef std::vector <uint32_t> emtpy_slots_t;
|
||||
emtpy_slots_t empty_slots;
|
||||
|
||||
// If true, zmq_init has been called but no socket have been created
|
||||
// yes. Launching of I/O threads is delayed.
|
||||
bool starting;
|
||||
|
||||
// If true, zmq_term was already called.
|
||||
bool terminating;
|
||||
|
||||
@ -143,6 +151,18 @@ namespace zmq
|
||||
// Synchronisation of access to the list of inproc endpoints.
|
||||
mutex_t endpoints_sync;
|
||||
|
||||
// Maximum socket ID.
|
||||
static atomic_counter_t max_socket_id;
|
||||
|
||||
// Maximum number of sockets that can be opened at the same time.
|
||||
int max_sockets;
|
||||
|
||||
// Number of I/O threads to launch.
|
||||
int io_thread_count;
|
||||
|
||||
// Synchronisation of access to context options.
|
||||
mutex_t opt_sync;
|
||||
|
||||
ctx_t (const ctx_t&);
|
||||
const ctx_t &operator = (const ctx_t&);
|
||||
};
|
||||
@ -150,4 +170,3 @@ namespace zmq
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2009 iMatix Corporation
|
||||
Copyright (c) 2011 VMware, Inc.
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
@ -48,7 +48,8 @@ zmq::options_t::options_t () :
|
||||
delay_on_disconnect (true),
|
||||
filter (false),
|
||||
send_identity (false),
|
||||
recv_identity (false)
|
||||
recv_identity (false),
|
||||
socket_id (0)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -110,6 +110,9 @@ namespace zmq
|
||||
|
||||
// Receivers identity from all new connections.
|
||||
bool recv_identity;
|
||||
|
||||
// ID of the socket.
|
||||
int socket_id;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -24,8 +24,8 @@
|
||||
#include "pipe.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
socket_base_t (parent_, tid_),
|
||||
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
pipe (NULL)
|
||||
{
|
||||
options.type = ZMQ_PAIR;
|
||||
|
@ -38,7 +38,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
pair_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
pair_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
|
||||
~pair_t ();
|
||||
|
||||
// Overloads of functions from socket_base_t.
|
||||
|
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2009 iMatix Corporation
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
@ -22,8 +22,8 @@
|
||||
#include "pub.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
xpub_t (parent_, tid_)
|
||||
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
xpub_t (parent_, tid_, sid_)
|
||||
{
|
||||
options.type = ZMQ_PUB;
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2009 iMatix Corporation
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
@ -36,7 +36,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
pub_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
pub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~pub_t ();
|
||||
|
||||
// Implementations of virtual functions from socket_base_t.
|
||||
|
@ -24,8 +24,8 @@
|
||||
#include "msg.hpp"
|
||||
#include "pipe.hpp"
|
||||
|
||||
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
socket_base_t (parent_, tid_)
|
||||
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_)
|
||||
{
|
||||
options.type = ZMQ_PULL;
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
pull_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
pull_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~pull_t ();
|
||||
|
||||
protected:
|
||||
|
@ -24,8 +24,8 @@
|
||||
#include "err.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
socket_base_t (parent_, tid_)
|
||||
zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_)
|
||||
{
|
||||
options.type = ZMQ_PUSH;
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
push_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
push_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~push_t ();
|
||||
|
||||
protected:
|
||||
|
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2011 iMatix Corporation
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
@ -23,8 +23,8 @@
|
||||
#include "err.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
xrep_t (parent_, tid_),
|
||||
zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
xrep_t (parent_, tid_, sid_),
|
||||
sending_reply (false),
|
||||
request_begins (true)
|
||||
{
|
||||
|
@ -36,7 +36,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
rep_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
rep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
|
||||
~rep_t ();
|
||||
|
||||
// Overloads of functions from socket_base_t.
|
||||
|
@ -27,8 +27,8 @@
|
||||
#include "random.hpp"
|
||||
#include "likely.hpp"
|
||||
|
||||
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
xreq_t (parent_, tid_),
|
||||
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
xreq_t (parent_, tid_, sid_),
|
||||
receiving_reply (false),
|
||||
message_begins (true)
|
||||
{
|
||||
|
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2009 iMatix Corporation
|
||||
Copyright (c) 2011 VMware, Inc.
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
@ -38,7 +38,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
req_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
req_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~req_t ();
|
||||
|
||||
// Overloads of functions from socket_base_t.
|
||||
|
@ -75,43 +75,43 @@ bool zmq::socket_base_t::check_tag ()
|
||||
}
|
||||
|
||||
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
|
||||
uint32_t tid_)
|
||||
uint32_t tid_, int sid_)
|
||||
{
|
||||
socket_base_t *s = NULL;
|
||||
switch (type_) {
|
||||
|
||||
case ZMQ_PAIR:
|
||||
s = new (std::nothrow) pair_t (parent_, tid_);
|
||||
s = new (std::nothrow) pair_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_PUB:
|
||||
s = new (std::nothrow) pub_t (parent_, tid_);
|
||||
s = new (std::nothrow) pub_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_SUB:
|
||||
s = new (std::nothrow) sub_t (parent_, tid_);
|
||||
s = new (std::nothrow) sub_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_REQ:
|
||||
s = new (std::nothrow) req_t (parent_, tid_);
|
||||
s = new (std::nothrow) req_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_REP:
|
||||
s = new (std::nothrow) rep_t (parent_, tid_);
|
||||
s = new (std::nothrow) rep_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_XREQ:
|
||||
s = new (std::nothrow) xreq_t (parent_, tid_);
|
||||
s = new (std::nothrow) xreq_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_XREP:
|
||||
s = new (std::nothrow) xrep_t (parent_, tid_);
|
||||
s = new (std::nothrow) xrep_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_PULL:
|
||||
s = new (std::nothrow) pull_t (parent_, tid_);
|
||||
s = new (std::nothrow) pull_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_PUSH:
|
||||
s = new (std::nothrow) push_t (parent_, tid_);
|
||||
s = new (std::nothrow) push_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_XPUB:
|
||||
s = new (std::nothrow) xpub_t (parent_, tid_);
|
||||
s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_XSUB:
|
||||
s = new (std::nothrow) xsub_t (parent_, tid_);
|
||||
s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
|
||||
break;
|
||||
default:
|
||||
errno = EINVAL;
|
||||
@ -121,7 +121,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
|
||||
return s;
|
||||
}
|
||||
|
||||
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
|
||||
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
own_t (parent_, tid_),
|
||||
tag (0xbaddecaf),
|
||||
ctx_terminated (false),
|
||||
@ -130,6 +130,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
|
||||
ticks (0),
|
||||
rcvmore (false)
|
||||
{
|
||||
options.socket_id = sid_;
|
||||
}
|
||||
|
||||
zmq::socket_base_t::~socket_base_t ()
|
||||
|
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2009 iMatix Corporation
|
||||
Copyright (c) 2011 VMware, Inc.
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
@ -57,7 +57,7 @@ namespace zmq
|
||||
|
||||
// Create a socket of a specified type.
|
||||
static socket_base_t *create (int type_, zmq::ctx_t *parent_,
|
||||
uint32_t tid_);
|
||||
uint32_t tid_, int sid_);
|
||||
|
||||
// Returns the mailbox associated with this socket.
|
||||
mailbox_t *get_mailbox ();
|
||||
@ -99,7 +99,7 @@ namespace zmq
|
||||
void unlock();
|
||||
protected:
|
||||
|
||||
socket_base_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
virtual ~socket_base_t ();
|
||||
|
||||
// Concrete algorithms for the x- methods are to be defined by
|
||||
|
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2009 iMatix Corporation
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
@ -22,8 +22,8 @@
|
||||
#include "sub.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
xsub_t (parent_, tid_)
|
||||
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
xsub_t (parent_, tid_, sid_)
|
||||
{
|
||||
options.type = ZMQ_SUB;
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
/*
|
||||
Copyright (c) 2007-2012 iMatix Corporation
|
||||
Copyright (c) 2009-2011 250bpm s.r.o.
|
||||
Copyright (c) 2007-2009 iMatix Corporation
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
@ -36,7 +36,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
sub_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
sub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~sub_t ();
|
||||
|
||||
protected:
|
||||
|
@ -26,8 +26,8 @@
|
||||
#include "err.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
socket_base_t (parent_, tid_),
|
||||
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
more (false)
|
||||
{
|
||||
options.type = ZMQ_XPUB;
|
||||
|
@ -43,7 +43,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
xpub_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
xpub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~xpub_t ();
|
||||
|
||||
// Implementations of virtual functions from socket_base_t.
|
||||
|
@ -27,8 +27,8 @@
|
||||
#include "likely.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
socket_base_t (parent_, tid_),
|
||||
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
prefetched (0),
|
||||
more_in (false),
|
||||
current_out (NULL),
|
||||
|
@ -44,7 +44,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
xrep_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
xrep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
|
||||
~xrep_t ();
|
||||
|
||||
// Overloads of functions from socket_base_t.
|
||||
|
@ -23,8 +23,8 @@
|
||||
#include "err.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
socket_base_t (parent_, tid_),
|
||||
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
prefetched (false)
|
||||
{
|
||||
options.type = ZMQ_XREQ;
|
||||
|
@ -40,7 +40,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
xreq_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
xreq_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
|
||||
~xreq_t ();
|
||||
|
||||
protected:
|
||||
|
@ -24,8 +24,8 @@
|
||||
#include "xsub.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
|
||||
socket_base_t (parent_, tid_),
|
||||
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
has_message (false),
|
||||
more (false)
|
||||
{
|
||||
|
@ -39,7 +39,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
xsub_t (zmq::ctx_t *parent_, uint32_t tid_);
|
||||
xsub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~xsub_t ();
|
||||
|
||||
protected:
|
||||
|
58
src/zmq.cpp
58
src/zmq.cpp
@ -92,7 +92,6 @@ struct iovec {
|
||||
typedef char check_msg_t_size
|
||||
[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
|
||||
|
||||
// Version.
|
||||
|
||||
void zmq_version (int *major_, int *minor_, int *patch_)
|
||||
{
|
||||
@ -101,7 +100,6 @@ void zmq_version (int *major_, int *minor_, int *patch_)
|
||||
*patch_ = ZMQ_VERSION_PATCH;
|
||||
}
|
||||
|
||||
// Errors.
|
||||
|
||||
const char *zmq_strerror (int errnum_)
|
||||
{
|
||||
@ -113,15 +111,11 @@ int zmq_errno ()
|
||||
return errno;
|
||||
}
|
||||
|
||||
// Contexts.
|
||||
|
||||
static zmq::ctx_t *s_init (int io_threads_)
|
||||
// New context API
|
||||
|
||||
void *zmq_ctx_new (void)
|
||||
{
|
||||
if (io_threads_ < 0) {
|
||||
errno = EINVAL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#if defined ZMQ_HAVE_OPENPGM
|
||||
|
||||
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
|
||||
@ -162,22 +156,18 @@ static zmq::ctx_t *s_init (int io_threads_)
|
||||
#endif
|
||||
|
||||
// Create 0MQ context.
|
||||
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
|
||||
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
|
||||
alloc_assert (ctx);
|
||||
return ctx;
|
||||
}
|
||||
|
||||
void *zmq_init (int io_threads_)
|
||||
{
|
||||
return (void *) s_init (io_threads_);
|
||||
}
|
||||
|
||||
int zmq_term (void *ctx_)
|
||||
int zmq_ctx_destroy (void *ctx_)
|
||||
{
|
||||
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
|
||||
errno = EFAULT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int rc = ((zmq::ctx_t*) ctx_)->terminate ();
|
||||
int en = errno;
|
||||
|
||||
@ -197,7 +187,41 @@ int zmq_term (void *ctx_)
|
||||
return rc;
|
||||
}
|
||||
|
||||
// Sockets.
|
||||
int zmq_ctx_set (void *ctx_, int option_, int optval_)
|
||||
{
|
||||
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
|
||||
errno = EFAULT;
|
||||
return -1;
|
||||
}
|
||||
return ((zmq::ctx_t*) ctx_)->set (option_, optval_);
|
||||
}
|
||||
|
||||
int zmq_ctx_get (void *ctx_, int option_)
|
||||
{
|
||||
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
|
||||
errno = EFAULT;
|
||||
return -1;
|
||||
}
|
||||
return ((zmq::ctx_t*) ctx_)->get (option_);
|
||||
}
|
||||
|
||||
|
||||
// Stable/legacy context API
|
||||
|
||||
void *zmq_init (int io_threads_)
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
|
||||
return ctx;
|
||||
}
|
||||
|
||||
int zmq_term (void *ctx_)
|
||||
{
|
||||
return zmq_ctx_destroy (ctx_);
|
||||
}
|
||||
|
||||
|
||||
// Sockets
|
||||
|
||||
void *zmq_socket (void *ctx_, int type_)
|
||||
{
|
||||
|
225
tests/test_last_endpoint
Executable file
225
tests/test_last_endpoint
Executable file
@ -0,0 +1,225 @@
|
||||
#! /bin/bash
|
||||
|
||||
# test_last_endpoint - temporary wrapper script for .libs/test_last_endpoint
|
||||
# Generated by libtool (GNU libtool) 2.4 Debian-2.4-2ubuntu1
|
||||
#
|
||||
# The test_last_endpoint program cannot be directly executed until all the libtool
|
||||
# libraries that it depends on are installed.
|
||||
#
|
||||
# This wrapper script should never be moved out of the build directory.
|
||||
# If it is, it will not operate correctly.
|
||||
|
||||
# Sed substitution that helps us do robust quoting. It backslashifies
|
||||
# metacharacters that are still active within double-quoted strings.
|
||||
sed_quote_subst='s/\([`"$\\]\)/\\\1/g'
|
||||
|
||||
# Be Bourne compatible
|
||||
if test -n "${ZSH_VERSION+set}" && (emulate sh) >/dev/null 2>&1; then
|
||||
emulate sh
|
||||
NULLCMD=:
|
||||
# Zsh 3.x and 4.x performs word splitting on ${1+"$@"}, which
|
||||
# is contrary to our usage. Disable this feature.
|
||||
alias -g '${1+"$@"}'='"$@"'
|
||||
setopt NO_GLOB_SUBST
|
||||
else
|
||||
case `(set -o) 2>/dev/null` in *posix*) set -o posix;; esac
|
||||
fi
|
||||
BIN_SH=xpg4; export BIN_SH # for Tru64
|
||||
DUALCASE=1; export DUALCASE # for MKS sh
|
||||
|
||||
# The HP-UX ksh and POSIX shell print the target directory to stdout
|
||||
# if CDPATH is set.
|
||||
(unset CDPATH) >/dev/null 2>&1 && unset CDPATH
|
||||
|
||||
relink_command="(cd /home/ph/work/libzmq_pieterh/tests; { test -z \"\${LIBRARY_PATH+set}\" || unset LIBRARY_PATH || { LIBRARY_PATH=; export LIBRARY_PATH; }; }; { test -z \"\${COMPILER_PATH+set}\" || unset COMPILER_PATH || { COMPILER_PATH=; export COMPILER_PATH; }; }; { test -z \"\${GCC_EXEC_PREFIX+set}\" || unset GCC_EXEC_PREFIX || { GCC_EXEC_PREFIX=; export GCC_EXEC_PREFIX; }; }; { test -z \"\${LD_RUN_PATH+set}\" || unset LD_RUN_PATH || { LD_RUN_PATH=; export LD_RUN_PATH; }; }; LD_LIBRARY_PATH=/usr/local/lib; export LD_LIBRARY_PATH; PATH=/opt/android-toolchain/bin:/usr/lib/lightdm/lightdm:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games; export PATH; g++ -g -O2 -o \$progdir/\$file test_last_endpoint.o ../src/.libs/libzmq.so -lrt -lpthread -Wl,-rpath -Wl,/home/ph/work/libzmq_pieterh/src/.libs)"
|
||||
|
||||
# This environment variable determines our operation mode.
|
||||
if test "$libtool_install_magic" = "%%%MAGIC variable%%%"; then
|
||||
# install mode needs the following variables:
|
||||
generated_by_libtool_version='2.4'
|
||||
notinst_deplibs=' ../src/libzmq.la'
|
||||
else
|
||||
# When we are sourced in execute mode, $file and $ECHO are already set.
|
||||
if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then
|
||||
file="$0"
|
||||
|
||||
# A function that is used when there is no print builtin or printf.
|
||||
func_fallback_echo ()
|
||||
{
|
||||
eval 'cat <<_LTECHO_EOF
|
||||
$1
|
||||
_LTECHO_EOF'
|
||||
}
|
||||
ECHO="printf %s\\n"
|
||||
fi
|
||||
|
||||
# Very basic option parsing. These options are (a) specific to
|
||||
# the libtool wrapper, (b) are identical between the wrapper
|
||||
# /script/ and the wrapper /executable/ which is used only on
|
||||
# windows platforms, and (c) all begin with the string --lt-
|
||||
# (application programs are unlikely to have options which match
|
||||
# this pattern).
|
||||
#
|
||||
# There are only two supported options: --lt-debug and
|
||||
# --lt-dump-script. There is, deliberately, no --lt-help.
|
||||
#
|
||||
# The first argument to this parsing function should be the
|
||||
# script's ../libtool value, followed by no.
|
||||
lt_option_debug=
|
||||
func_parse_lt_options ()
|
||||
{
|
||||
lt_script_arg0=$0
|
||||
shift
|
||||
for lt_opt
|
||||
do
|
||||
case "$lt_opt" in
|
||||
--lt-debug) lt_option_debug=1 ;;
|
||||
--lt-dump-script)
|
||||
lt_dump_D=`$ECHO "X$lt_script_arg0" | /bin/sed -e 's/^X//' -e 's%/[^/]*$%%'`
|
||||
test "X$lt_dump_D" = "X$lt_script_arg0" && lt_dump_D=.
|
||||
lt_dump_F=`$ECHO "X$lt_script_arg0" | /bin/sed -e 's/^X//' -e 's%^.*/%%'`
|
||||
cat "$lt_dump_D/$lt_dump_F"
|
||||
exit 0
|
||||
;;
|
||||
--lt-*)
|
||||
$ECHO "Unrecognized --lt- option: '$lt_opt'" 1>&2
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
# Print the debug banner immediately:
|
||||
if test -n "$lt_option_debug"; then
|
||||
echo "test_last_endpoint:test_last_endpoint:${LINENO}: libtool wrapper (GNU libtool) 2.4 Debian-2.4-2ubuntu1" 1>&2
|
||||
fi
|
||||
}
|
||||
|
||||
# Used when --lt-debug. Prints its arguments to stdout
|
||||
# (redirection is the responsibility of the caller)
|
||||
func_lt_dump_args ()
|
||||
{
|
||||
lt_dump_args_N=1;
|
||||
for lt_arg
|
||||
do
|
||||
$ECHO "test_last_endpoint:test_last_endpoint:${LINENO}: newargv[$lt_dump_args_N]: $lt_arg"
|
||||
lt_dump_args_N=`expr $lt_dump_args_N + 1`
|
||||
done
|
||||
}
|
||||
|
||||
# Core function for launching the target application
|
||||
func_exec_program_core ()
|
||||
{
|
||||
|
||||
if test -n "$lt_option_debug"; then
|
||||
$ECHO "test_last_endpoint:test_last_endpoint:${LINENO}: newargv[0]: $progdir/$program" 1>&2
|
||||
func_lt_dump_args ${1+"$@"} 1>&2
|
||||
fi
|
||||
exec "$progdir/$program" ${1+"$@"}
|
||||
|
||||
$ECHO "$0: cannot exec $program $*" 1>&2
|
||||
exit 1
|
||||
}
|
||||
|
||||
# A function to encapsulate launching the target application
|
||||
# Strips options in the --lt-* namespace from $@ and
|
||||
# launches target application with the remaining arguments.
|
||||
func_exec_program ()
|
||||
{
|
||||
for lt_wr_arg
|
||||
do
|
||||
case $lt_wr_arg in
|
||||
--lt-*) ;;
|
||||
*) set x "$@" "$lt_wr_arg"; shift;;
|
||||
esac
|
||||
shift
|
||||
done
|
||||
func_exec_program_core ${1+"$@"}
|
||||
}
|
||||
|
||||
# Parse options
|
||||
func_parse_lt_options "$0" ${1+"$@"}
|
||||
|
||||
# Find the directory that this script lives in.
|
||||
thisdir=`$ECHO "$file" | /bin/sed 's%/[^/]*$%%'`
|
||||
test "x$thisdir" = "x$file" && thisdir=.
|
||||
|
||||
# Follow symbolic links until we get to the real thisdir.
|
||||
file=`ls -ld "$file" | /bin/sed -n 's/.*-> //p'`
|
||||
while test -n "$file"; do
|
||||
destdir=`$ECHO "$file" | /bin/sed 's%/[^/]*$%%'`
|
||||
|
||||
# If there was a directory component, then change thisdir.
|
||||
if test "x$destdir" != "x$file"; then
|
||||
case "$destdir" in
|
||||
[\\/]* | [A-Za-z]:[\\/]*) thisdir="$destdir" ;;
|
||||
*) thisdir="$thisdir/$destdir" ;;
|
||||
esac
|
||||
fi
|
||||
|
||||
file=`$ECHO "$file" | /bin/sed 's%^.*/%%'`
|
||||
file=`ls -ld "$thisdir/$file" | /bin/sed -n 's/.*-> //p'`
|
||||
done
|
||||
|
||||
# Usually 'no', except on cygwin/mingw when embedded into
|
||||
# the cwrapper.
|
||||
WRAPPER_SCRIPT_BELONGS_IN_OBJDIR=no
|
||||
if test "$WRAPPER_SCRIPT_BELONGS_IN_OBJDIR" = "yes"; then
|
||||
# special case for '.'
|
||||
if test "$thisdir" = "."; then
|
||||
thisdir=`pwd`
|
||||
fi
|
||||
# remove .libs from thisdir
|
||||
case "$thisdir" in
|
||||
*[\\/].libs ) thisdir=`$ECHO "$thisdir" | /bin/sed 's%[\\/][^\\/]*$%%'` ;;
|
||||
.libs ) thisdir=. ;;
|
||||
esac
|
||||
fi
|
||||
|
||||
# Try to get the absolute directory name.
|
||||
absdir=`cd "$thisdir" && pwd`
|
||||
test -n "$absdir" && thisdir="$absdir"
|
||||
|
||||
program=lt-'test_last_endpoint'
|
||||
progdir="$thisdir/.libs"
|
||||
|
||||
if test ! -f "$progdir/$program" ||
|
||||
{ file=`ls -1dt "$progdir/$program" "$progdir/../$program" 2>/dev/null | /bin/sed 1q`; \
|
||||
test "X$file" != "X$progdir/$program"; }; then
|
||||
|
||||
file="$$-$program"
|
||||
|
||||
if test ! -d "$progdir"; then
|
||||
mkdir "$progdir"
|
||||
else
|
||||
rm -f "$progdir/$file"
|
||||
fi
|
||||
|
||||
# relink executable if necessary
|
||||
if test -n "$relink_command"; then
|
||||
if relink_command_output=`eval $relink_command 2>&1`; then :
|
||||
else
|
||||
printf %s\n "$relink_command_output" >&2
|
||||
rm -f "$progdir/$file"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
mv -f "$progdir/$file" "$progdir/$program" 2>/dev/null ||
|
||||
{ rm -f "$progdir/$program";
|
||||
mv -f "$progdir/$file" "$progdir/$program"; }
|
||||
rm -f "$progdir/$file"
|
||||
fi
|
||||
|
||||
if test -f "$progdir/$program"; then
|
||||
if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then
|
||||
# Run the actual program with our arguments.
|
||||
func_exec_program ${1+"$@"}
|
||||
fi
|
||||
else
|
||||
# The program doesn't exist.
|
||||
$ECHO "$0: error: \`$progdir/$program' does not exist" 1>&2
|
||||
$ECHO "This script is just a wrapper for $program." 1>&2
|
||||
$ECHO "See the libtool documentation for more information." 1>&2
|
||||
exit 1
|
||||
fi
|
||||
fi
|
Loading…
Reference in New Issue
Block a user