mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-13 10:52:56 +01:00
Close pipes for inproc sockets on zmq_disconnect
- fixes LIBZMQ-476 and LIBZMQ-475
This commit is contained in:
parent
b2f6741bcb
commit
66c22456b9
1
AUTHORS
1
AUTHORS
@ -66,6 +66,7 @@ Pieter Hintjens <ph@imatix.com>
|
||||
Piotr Trojanek <piotr.trojanek@gmail.com>
|
||||
Robert G. Jakabosky <bobby@sharedrealm.com>
|
||||
Sebastian Otaegui <feniix@gmail.com>
|
||||
Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de>
|
||||
Steven McCoy <steven.mccoy@miru.hk>
|
||||
Stuart Webster <sw_webster@hotmail.com>
|
||||
Tamara Kustarova <kustarova.tamara@gmail.com>
|
||||
|
@ -34,6 +34,8 @@ The endpoint supplied is invalid.
|
||||
The 0MQ 'context' associated with the specified 'socket' was terminated.
|
||||
*ENOTSOCK*::
|
||||
The provided 'socket' was invalid.
|
||||
*ENOENT*::
|
||||
The provided endpoint is not connected.
|
||||
|
||||
|
||||
EXAMPLE
|
||||
|
@ -478,6 +478,10 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
// Save last endpoint URI
|
||||
options.last_endpoint.assign (addr_);
|
||||
|
||||
// remember inproc connections for disconnect
|
||||
inprocs.insert (inprocs_t::value_type (std::string (addr_), pipes[0]));
|
||||
inprocs.insert (inprocs_t::value_type (std::string (addr_), pipes[1]));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -584,10 +588,37 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
|
||||
if (unlikely (rc != 0))
|
||||
return -1;
|
||||
|
||||
// Parse addr_ string.
|
||||
std::string protocol;
|
||||
std::string address;
|
||||
rc = parse_uri (addr_, protocol, address);
|
||||
if (rc != 0)
|
||||
return -1;
|
||||
|
||||
rc = check_protocol (protocol);
|
||||
if (rc != 0)
|
||||
return -1;
|
||||
|
||||
// Disconnect an inproc socket
|
||||
if (protocol == "inproc") {
|
||||
std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
|
||||
if (range.first == range.second) {
|
||||
errno = ENOENT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (inprocs_t::iterator it = range.first; it != range.second; ++it)
|
||||
it->second->terminate(true);
|
||||
inprocs.erase (range.first, range.second);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Find the endpoints range (if any) corresponding to the addr_ string.
|
||||
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
|
||||
if (range.first == range.second)
|
||||
if (range.first == range.second) {
|
||||
errno = ENOENT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (endpoints_t::iterator it = range.first; it != range.second; ++it)
|
||||
term_child (it->second);
|
||||
|
@ -170,6 +170,10 @@ namespace zmq
|
||||
typedef std::multimap <std::string, own_t *> endpoints_t;
|
||||
endpoints_t endpoints;
|
||||
|
||||
// Map of open inproc endpoints.
|
||||
typedef std::multimap <std::string, pipe_t *> inprocs_t;
|
||||
inprocs_t inprocs;
|
||||
|
||||
// To be called after processing commands or invoking any command
|
||||
// handlers explicitly. If required, it will deallocate the socket.
|
||||
void check_destroy ();
|
||||
|
@ -18,7 +18,9 @@ noinst_PROGRAMS = test_pair_inproc \
|
||||
test_term_endpoint \
|
||||
test_monitor \
|
||||
test_router_mandatory \
|
||||
test_raw_sock
|
||||
test_raw_sock \
|
||||
test_disconnect_inproc
|
||||
|
||||
|
||||
if !ON_MINGW
|
||||
noinst_PROGRAMS += test_shutdown_stress \
|
||||
@ -43,6 +45,7 @@ test_term_endpoint_SOURCES = test_term_endpoint.cpp
|
||||
test_monitor_SOURCES = test_monitor.cpp
|
||||
test_router_mandatory_SOURCES = test_router_mandatory.cpp
|
||||
test_raw_sock_SOURCES = test_raw_sock.cpp
|
||||
test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp
|
||||
|
||||
if !ON_MINGW
|
||||
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
|
||||
|
119
tests/test_disconnect_inproc.cpp
Normal file
119
tests/test_disconnect_inproc.cpp
Normal file
@ -0,0 +1,119 @@
|
||||
#include <zmq.h>
|
||||
#include <inttypes.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
|
||||
/// Initialize a zeromq message with a given null-terminated string
|
||||
#define ZMQ_PREPARE_STRING(msg, data, size) \
|
||||
zmq_msg_init(&msg) && printf("zmq_msg_init: %s\n", zmq_strerror(errno)); \
|
||||
zmq_msg_init_size (&msg, size + 1) && printf("zmq_msg_init_size: %s\n",zmq_strerror(errno)); \
|
||||
memcpy(zmq_msg_data(&msg), data, size + 1);
|
||||
|
||||
int publicationsReceived = 0;
|
||||
bool isSubscribed = false;
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
void* context = zmq_ctx_new();
|
||||
void* pubSocket;
|
||||
void* subSocket;
|
||||
|
||||
(pubSocket = zmq_socket(context, ZMQ_XPUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno));
|
||||
(subSocket = zmq_socket(context, ZMQ_SUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno));
|
||||
zmq_setsockopt(subSocket, ZMQ_SUBSCRIBE, "foo", 3) && printf("zmq_setsockopt: %s\n",zmq_strerror(errno));
|
||||
|
||||
zmq_bind(pubSocket, "inproc://someInProcDescriptor") && printf("zmq_bind: %s\n", zmq_strerror(errno));
|
||||
//zmq_bind(pubSocket, "tcp://*:30010") && printf("zmq_bind: %s\n", zmq_strerror(errno));
|
||||
|
||||
int32_t more;
|
||||
size_t more_size = sizeof(more);
|
||||
int iteration = 0;
|
||||
|
||||
while(1) {
|
||||
zmq_pollitem_t items [] = {
|
||||
{ subSocket, 0, ZMQ_POLLIN, 0 }, // read publications
|
||||
{ pubSocket, 0, ZMQ_POLLIN, 0 }, // read subscriptions
|
||||
};
|
||||
zmq_poll(items, 2, 500);
|
||||
|
||||
if (items[1].revents & ZMQ_POLLIN) {
|
||||
while (1) {
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
zmq_msg_recv (&msg, pubSocket, 0);
|
||||
int msgSize = zmq_msg_size(&msg);
|
||||
char* buffer = (char*)zmq_msg_data(&msg);
|
||||
|
||||
if (buffer[0] == 0) {
|
||||
assert(isSubscribed);
|
||||
printf("unsubscribing from '%s'\n", strndup(buffer + 1, msgSize - 1));
|
||||
isSubscribed = false;
|
||||
} else {
|
||||
assert(!isSubscribed);
|
||||
printf("subscribing on '%s'\n", strndup(buffer + 1, msgSize - 1));
|
||||
isSubscribed = true;
|
||||
}
|
||||
|
||||
zmq_getsockopt (pubSocket, ZMQ_RCVMORE, &more, &more_size);
|
||||
zmq_msg_close (&msg);
|
||||
|
||||
if (!more)
|
||||
break; // Last message part
|
||||
}
|
||||
}
|
||||
|
||||
if (items[0].revents & ZMQ_POLLIN) {
|
||||
while (1) {
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
zmq_msg_recv (&msg, subSocket, 0);
|
||||
int msgSize = zmq_msg_size(&msg);
|
||||
char* buffer = (char*)zmq_msg_data(&msg);
|
||||
|
||||
printf("received on subscriber '%s'\n", strndup(buffer, msgSize));
|
||||
|
||||
zmq_getsockopt (subSocket, ZMQ_RCVMORE, &more, &more_size);
|
||||
zmq_msg_close (&msg);
|
||||
|
||||
if (!more) {
|
||||
publicationsReceived++;
|
||||
break; // Last message part
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (iteration == 1) {
|
||||
zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno));
|
||||
//zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno));
|
||||
}
|
||||
|
||||
if (iteration == 4) {
|
||||
zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno));
|
||||
//zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno));
|
||||
}
|
||||
|
||||
if (iteration == 10) {
|
||||
break;
|
||||
}
|
||||
|
||||
zmq_msg_t channelEnvlp;
|
||||
ZMQ_PREPARE_STRING(channelEnvlp, "foo", 3);
|
||||
zmq_sendmsg(pubSocket, &channelEnvlp, ZMQ_SNDMORE) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno));
|
||||
zmq_msg_close(&channelEnvlp) && printf("zmq_msg_close: %s\n",zmq_strerror(errno));
|
||||
|
||||
zmq_msg_t message;
|
||||
ZMQ_PREPARE_STRING(message, "this is foo!", 12);
|
||||
zmq_sendmsg(pubSocket, &message, 0) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno));
|
||||
zmq_msg_close(&message) && printf("zmq_msg_close: %s\n",zmq_strerror(errno));
|
||||
|
||||
iteration++;
|
||||
}
|
||||
|
||||
assert(publicationsReceived == 3);
|
||||
assert(!isSubscribed);
|
||||
|
||||
zmq_close(pubSocket) && printf("zmq_close: %s", zmq_strerror(errno));
|
||||
zmq_close(subSocket) && printf("zmq_close: %s", zmq_strerror(errno));
|
||||
|
||||
zmq_ctx_destroy(context);
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue
Block a user