From 1aaa893e027247f8aa7aed6e136e95c5335ab91d Mon Sep 17 00:00:00 2001 From: somdoron Date: Sun, 16 Aug 2015 12:48:03 +0300 Subject: [PATCH] add tests for polling on thread safe sockets --- Makefile.am | 7 +- tests/CMakeLists.txt | 1 + tests/test_thread_safe_polling.cpp | 170 +++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 tests/test_thread_safe_polling.cpp diff --git a/Makefile.am b/Makefile.am index ada4e523..a6b5b530 100644 --- a/Makefile.am +++ b/Makefile.am @@ -366,7 +366,8 @@ test_apps = \ tests/test_thread_safe \ tests/test_socketopt_hwm \ tests/test_heartbeats \ - tests/test_stream_exceeds_buffer + tests/test_stream_exceeds_buffer \ + tests/test_thread_safe_polling tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_LDADD = src/libzmq.la @@ -577,6 +578,10 @@ tests_test_heartbeats_LDADD = src/libzmq.la tests_test_stream_exceeds_buffer_SOURCES = tests/test_stream_exceeds_buffer.cpp tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la +tests_test_thread_safe_polling_SOURCES = tests/test_thread_safe_polling.cpp +tests_test_thread_safe_polling_LDADD = src/libzmq.la + + if !ON_MINGW if !ON_CYGWIN test_apps += \ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c30978ce..0197c084 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -52,6 +52,7 @@ set(tests test_client_server test_sockopt_hwm test_heartbeats + test_thread_safe_polling ) if(NOT WIN32) list(APPEND tests diff --git a/tests/test_thread_safe_polling.cpp b/tests/test_thread_safe_polling.cpp new file mode 100644 index 00000000..7aa7f7de --- /dev/null +++ b/tests/test_thread_safe_polling.cpp @@ -0,0 +1,170 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +void worker(void* s); + +int main (void) +{ + setup_test_environment(); + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *server = zmq_socket (ctx, ZMQ_SERVER); + void *server2 = zmq_socket (ctx, ZMQ_SERVER); + void *poller = zmq_poller_new (); + + int rc; + + rc = zmq_add_poller (server, poller); + assert (rc == 0); + + rc = zmq_add_poller (server2, poller); + assert (rc == 0); + + zmq_pollitem_t items[2]; + + items[0].socket = server; + items[0].poller = poller; + items[0].events = ZMQ_POLLIN; + + items[1].socket = server2; + items[1].poller = poller; + items[1].events = ZMQ_POLLIN; + + rc = zmq_bind (server, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + rc = zmq_bind (server2, "tcp://127.0.0.1:5561"); + assert (rc == 0); + + void* t = zmq_threadstart(worker, ctx); + + assert (rc == 0); + + rc = zmq_poll (items, 2, -1); + assert (rc == 1); + + assert (items[0].revents == ZMQ_POLLIN); + assert (items[1].revents == 0); + + zmq_msg_t msg; + rc = zmq_msg_init(&msg); + rc = zmq_msg_recv(&msg, server, ZMQ_DONTWAIT); + assert (rc == 1); + + rc = zmq_poll (items, 2, -1); + assert (rc == 1); + + assert (items[0].revents == 0); + assert (items[1].revents == ZMQ_POLLIN); + + rc = zmq_msg_recv(&msg, server2, ZMQ_DONTWAIT); + assert (rc == 1); + + rc = zmq_poll (items, 2, 0); + assert (rc == 0); + + assert (items[0].revents == 0); + assert (items[1].revents == 0); + + zmq_threadclose(t); + + rc = zmq_msg_close(&msg); + assert (rc == 0); + + rc = zmq_remove_poller (server, poller); + assert (rc == 0); + + rc = zmq_remove_poller (server2, poller); + assert (rc == 0); + + rc = zmq_poller_close (poller); + assert (rc == 0); + + rc = zmq_close (server); + assert (rc == 0); + + rc = zmq_close (server2); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0; +} + +void worker(void* ctx) +{ + void *client = zmq_socket (ctx, ZMQ_CLIENT); + + int rc = zmq_connect (client, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + msleep(100); + + zmq_msg_t msg; + rc = zmq_msg_init_size(&msg,1); + assert (rc == 0); + + char * data = (char *)zmq_msg_data(&msg); + data[0] = 1; + + rc = zmq_msg_send(&msg, client, 0); + assert (rc == 1); + + rc = zmq_disconnect (client, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + rc = zmq_connect (client, "tcp://127.0.0.1:5561"); + assert (rc == 0); + + msleep(100); + + rc = zmq_msg_close(&msg); + assert (rc == 0); + + rc = zmq_msg_init_size(&msg,1); + assert (rc == 0); + + data = (char *)zmq_msg_data(&msg); + data[0] = 1; + + rc = zmq_msg_send(&msg, client, 0); + assert (rc == 1); + + rc = zmq_msg_close(&msg); + assert (rc == 0); + + rc = zmq_close (client); + assert (rc == 0); +} + +