From 7b9e9b838d2868c7fd4db47c09e23d831dc27925 Mon Sep 17 00:00:00 2001 From: Rik van der Heijden Date: Fri, 24 Apr 2015 23:01:20 +0200 Subject: [PATCH] Issue #1382: Do not send data to backend when there are no listeners (+ tests) --- .gitignore | 1 + AUTHORS | 1 + Makefile.am | 1 + src/proxy.cpp | 6 +- tests/CMakeLists.txt | 1 + tests/test_proxy_terminate.cpp | 113 +++++++++++++++++++++++++++++++++ 6 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 tests/test_proxy_terminate.cpp diff --git a/.gitignore b/.gitignore index fee8796f..a5d4fc07 100644 --- a/.gitignore +++ b/.gitignore @@ -74,6 +74,7 @@ test_linger test_security_null test_security_plain test_proxy +test_proxy_terminate test_abstract_ipc test_filter_ipc test_connect_delay_tipc diff --git a/AUTHORS b/AUTHORS index 8402d31b..0b7d7fc5 100644 --- a/AUTHORS +++ b/AUTHORS @@ -86,6 +86,7 @@ Philip Kovacs Pieter Hintjens Piotr Trojanek Richard Newton +Rik van der Heijden Robert G. Jakabosky Sebastian Otaegui Stefan Radomski diff --git a/Makefile.am b/Makefile.am index e63749ad..aa1c8f1a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -340,6 +340,7 @@ test_apps = \ tests/test_inproc_connect \ tests/test_issue_566 \ tests/test_proxy \ + tests/test_proxy_terminate \ tests/test_many_sockets \ tests/test_ipc_wildcard \ tests/test_diffserv \ diff --git a/src/proxy.cpp b/src/proxy.cpp index 81d98838..de0f045a 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -159,14 +159,16 @@ int zmq::proxy ( } // Process a request if (state == active - && items [0].revents & ZMQ_POLLIN) { + && items [0].revents & ZMQ_POLLIN + && items [1].revents & ZMQ_POLLOUT) { rc = forward(frontend_, backend_, capture_,msg); if (unlikely (rc < 0)) return -1; } // Process a reply if (state == active - && items [1].revents & ZMQ_POLLIN) { + && items [1].revents & ZMQ_POLLIN + && items [0].revents & ZMQ_POLLOUT) { rc = forward(backend_, frontend_, capture_,msg); if (unlikely (rc < 0)) return -1; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 74071a04..3daa8164 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -55,6 +55,7 @@ if(NOT WIN32) test_reqrep_ipc test_abstract_ipc test_proxy + test_proxy_terminate test_filter_ipc ) if(HAVE_FORK) diff --git a/tests/test_proxy_terminate.cpp b/tests/test_proxy_terminate.cpp new file mode 100644 index 00000000..83e70d4b --- /dev/null +++ b/tests/test_proxy_terminate.cpp @@ -0,0 +1,113 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "../include/zmq_utils.h" + +// This is a test for issue #1382. The server thread creates a SUB-PUSH +// steerable proxy. The main process then sends messages to the SUB +// but there is no pull on the other side, previously the proxy blocks +// in writing to the backend, preventing the proxy from terminating + +void +server_task (void *ctx) +{ + // Frontend socket talks to main process + void *frontend = zmq_socket (ctx, ZMQ_SUB); + assert (frontend); + int rc = zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_bind (frontend, "tcp://127.0.0.1:15564"); + assert (rc == 0); + + // Nice socket which is never read + void *backend = zmq_socket (ctx, ZMQ_PUSH); + assert (backend); + rc = zmq_bind (backend, "tcp://127.0.0.1:15563"); + assert (rc == 0); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_SUB); + assert (control); + rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_connect (control, "inproc://control"); + assert (rc == 0); + + // Connect backend to frontend via a proxy + zmq_proxy_steerable (frontend, backend, NULL, control); + + rc = zmq_close (frontend); + assert (rc == 0); + rc = zmq_close (backend); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); +} + + +// The main thread simply starts a basic steerable proxy server, publishes some messages, and then +// waits for the server to terminate. + +int main (void) +{ + setup_test_environment (); + + void *ctx = zmq_ctx_new (); + assert (ctx); + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_PUB); + assert (control); + int rc = zmq_bind (control, "inproc://control"); + assert (rc == 0); + + void *thread = zmq_threadstart(&server_task, ctx); + msleep (500); // Run for 500 ms + + // Start a secondary publisher which writes data to the SUB-PUSH server socket + void *publisher = zmq_socket (ctx, ZMQ_PUB); + assert (publisher); + rc = zmq_connect (publisher, "tcp://127.0.0.1:15564"); + assert (rc == 0); + + msleep (50); + rc = zmq_send (publisher, "This is a test", 14, 0); + assert (rc == 14); + + msleep (50); + rc = zmq_send (publisher, "This is a test", 14, 0); + assert (rc == 14); + + msleep (50); + rc = zmq_send (publisher, "This is a test", 14, 0); + assert (rc == 14); + rc = zmq_send (control, "TERMINATE", 9, 0); + assert (rc == 9); + + rc = zmq_close (publisher); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); + + zmq_threadclose (thread); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + return 0; +}