From 1c7b09afaf29ad7e1b767c716f674fb56fc13cd7 Mon Sep 17 00:00:00 2001 From: Rik van der Heijden <mail@rikvanderheijden.com> Date: Fri, 24 Apr 2015 23:01:20 +0200 Subject: [PATCH] Merged fix for #1382 Also fixed Makefile.am with missing specs for test case. --- .gitignore | 1 + AUTHORS | 1 + Makefile.am | 1 + NEWS | 4 +- src/proxy.cpp | 6 +- tests/CMakeLists.txt | 1 + tests/test_proxy_terminate.cpp | 113 +++++++++++++++++++++++++++++++++ 7 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 tests/test_proxy_terminate.cpp diff --git a/.gitignore b/.gitignore index 100863bd..ccc7e172 100644 --- a/.gitignore +++ b/.gitignore @@ -74,6 +74,7 @@ test_linger test_security_null test_security_plain test_proxy +test_proxy_terminate test_abstract_ipc test_filter_ipc test_connect_delay_tipc diff --git a/AUTHORS b/AUTHORS index 1249b6a5..d0c2c47e 100644 --- a/AUTHORS +++ b/AUTHORS @@ -86,6 +86,7 @@ Philip Kovacs Pieter Hintjens Piotr Trojanek Richard Newton +Rik van der Heijden Robert G. Jakabosky Sebastian Otaegui Stefan Radomski diff --git a/Makefile.am b/Makefile.am index 37d7e0f2..40622151 100644 --- a/Makefile.am +++ b/Makefile.am @@ -320,6 +320,7 @@ test_apps = \ test_inproc_connect \ test_issue_566 \ test_proxy \ + test_proxy_terminate \ test_many_sockets \ test_ipc_wildcard \ test_diffserv \ diff --git a/NEWS b/NEWS index 4f087b00..60ce47f1 100644 --- a/NEWS +++ b/NEWS @@ -1,4 +1,4 @@ -0MQ version 4.1.1 rc2, released on 2014/12/xx +0MQ version 4.1.1 rc2, released on 2015/xx/xx ============================================= * Fixed #1208 - fix recursion in automake packaging @@ -17,6 +17,8 @@ * Fixed #1389 - PUB, PUSH sockets had slow memory leak. +* Fixed #1382 - zmq_proxy did not terminate if there were no readers. + 0MQ version 4.1.0 rc1, released on 2014/10/14 ============================================= diff --git a/src/proxy.cpp b/src/proxy.cpp index 625b0dd8..b97b52c7 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -159,14 +159,16 @@ int zmq::proxy ( } // Process a request if (state == active - && items [0].revents & ZMQ_POLLIN) { + && items [0].revents & ZMQ_POLLIN + && items [1].revents & ZMQ_POLLOUT) { rc = forward(frontend_, backend_, capture_,msg); if (unlikely (rc < 0)) return -1; } // Process a reply if (state == active - && items [1].revents & ZMQ_POLLIN) { + && items [1].revents & ZMQ_POLLIN + && items [0].revents & ZMQ_POLLOUT) { rc = forward(backend_, frontend_, capture_,msg); if (unlikely (rc < 0)) return -1; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 54b50da9..ad0f9603 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -52,6 +52,7 @@ if(NOT WIN32) test_reqrep_ipc test_abstract_ipc test_proxy + test_proxy_terminate test_filter_ipc ) if(HAVE_FORK) diff --git a/tests/test_proxy_terminate.cpp b/tests/test_proxy_terminate.cpp new file mode 100644 index 00000000..83e70d4b --- /dev/null +++ b/tests/test_proxy_terminate.cpp @@ -0,0 +1,113 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "testutil.hpp" +#include "../include/zmq_utils.h" + +// This is a test for issue #1382. The server thread creates a SUB-PUSH +// steerable proxy. The main process then sends messages to the SUB +// but there is no pull on the other side, previously the proxy blocks +// in writing to the backend, preventing the proxy from terminating + +void +server_task (void *ctx) +{ + // Frontend socket talks to main process + void *frontend = zmq_socket (ctx, ZMQ_SUB); + assert (frontend); + int rc = zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_bind (frontend, "tcp://127.0.0.1:15564"); + assert (rc == 0); + + // Nice socket which is never read + void *backend = zmq_socket (ctx, ZMQ_PUSH); + assert (backend); + rc = zmq_bind (backend, "tcp://127.0.0.1:15563"); + assert (rc == 0); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_SUB); + assert (control); + rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_connect (control, "inproc://control"); + assert (rc == 0); + + // Connect backend to frontend via a proxy + zmq_proxy_steerable (frontend, backend, NULL, control); + + rc = zmq_close (frontend); + assert (rc == 0); + rc = zmq_close (backend); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); +} + + +// The main thread simply starts a basic steerable proxy server, publishes some messages, and then +// waits for the server to terminate. + +int main (void) +{ + setup_test_environment (); + + void *ctx = zmq_ctx_new (); + assert (ctx); + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_PUB); + assert (control); + int rc = zmq_bind (control, "inproc://control"); + assert (rc == 0); + + void *thread = zmq_threadstart(&server_task, ctx); + msleep (500); // Run for 500 ms + + // Start a secondary publisher which writes data to the SUB-PUSH server socket + void *publisher = zmq_socket (ctx, ZMQ_PUB); + assert (publisher); + rc = zmq_connect (publisher, "tcp://127.0.0.1:15564"); + assert (rc == 0); + + msleep (50); + rc = zmq_send (publisher, "This is a test", 14, 0); + assert (rc == 14); + + msleep (50); + rc = zmq_send (publisher, "This is a test", 14, 0); + assert (rc == 14); + + msleep (50); + rc = zmq_send (publisher, "This is a test", 14, 0); + assert (rc == 14); + rc = zmq_send (control, "TERMINATE", 9, 0); + assert (rc == 9); + + rc = zmq_close (publisher); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); + + zmq_threadclose (thread); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + return 0; +}