diff --git a/Makefile.am b/Makefile.am index 0f1bb271..f3754372 100644 --- a/Makefile.am +++ b/Makefile.am @@ -320,6 +320,7 @@ test_apps = \ test_inproc_connect \ test_issue_566 \ test_proxy \ + test_proxy_single_socket \ test_proxy_terminate \ test_many_sockets \ test_ipc_wildcard \ @@ -467,6 +468,9 @@ test_issue_566_LDADD = libzmq.la test_proxy_SOURCES = tests/test_proxy.cpp test_proxy_LDADD = libzmq.la +test_proxy_single_socket_SOURCES = tests/test_proxy_single_socket.cpp +test_proxy_single_socket_LDADD = libzmq.la + test_proxy_terminate_SOURCES = tests/test_proxy_terminate.cpp test_proxy_terminate_LDADD = libzmq.la diff --git a/src/proxy.cpp b/src/proxy.cpp index 738a77ae..6e05d8a8 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -142,10 +142,14 @@ int zmq::proxy ( return -1; // Get the pollout separately because when combining this with pollin it maxes the CPU - // because pollout shall most of the time return directly - rc = zmq_poll (&itemsout [0], 2, 0); - if (unlikely (rc < 0)) - return -1; + // because pollout shall most of the time return directly. + // POLLOUT is only checked when frontend and backend sockets are not the same. + if (frontend_ != backend_) { + rc = zmq_poll (&itemsout [0], 2, 0); + if (unlikely (rc < 0)) { + return -1; + } + } // Process a control command if any if (control_ && items [2].revents & ZMQ_POLLIN) { @@ -180,13 +184,14 @@ int zmq::proxy ( // Process a request if (state == active && items [0].revents & ZMQ_POLLIN - && itemsout [1].revents & ZMQ_POLLOUT) { + && (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) { rc = forward(frontend_, backend_, capture_,msg); if (unlikely (rc < 0)) return -1; } // Process a reply if (state == active + && frontend_ != backend_ && items [1].revents & ZMQ_POLLIN && itemsout [0].revents & ZMQ_POLLOUT) { rc = forward(backend_, frontend_, capture_,msg); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ad0f9603..27bbcc0e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -52,6 +52,7 @@ if(NOT WIN32) test_reqrep_ipc test_abstract_ipc test_proxy + test_proxy_single_socket test_proxy_terminate test_filter_ipc ) diff --git a/tests/test_proxy_single_socket.cpp b/tests/test_proxy_single_socket.cpp new file mode 100644 index 00000000..bd63f7e3 --- /dev/null +++ b/tests/test_proxy_single_socket.cpp @@ -0,0 +1,113 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "../include/zmq_utils.h" + + + +// This is our server task. +// It runs a proxy with a single REP socket as both frontend and backend. + +void +server_task (void *ctx) +{ + void *rep = zmq_socket (ctx, ZMQ_REP); + assert (rep); + int rc = zmq_bind (rep, "tcp://127.0.0.1:5563"); + assert (rc == 0); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_SUB); + assert (control); + rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_connect (control, "inproc://control"); + assert (rc == 0); + + // Use rep as both frontend and backend + zmq_proxy_steerable (rep, rep, NULL, control); + + rc = zmq_close (rep); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); +} + + +// The main thread simply starts several clients and a server, and then +// waits for the server to finish. + +int main (void) +{ + setup_test_environment (); + + void *ctx = zmq_ctx_new (); + assert (ctx); + // client socket pings proxy over tcp + void *req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + int rc = zmq_connect (req, "tcp://127.0.0.1:5563"); + assert (rc == 0); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_PUB); + assert (control); + rc = zmq_bind (control, "inproc://control"); + assert (rc == 0); + + void *server_thread = zmq_threadstart(&server_task, ctx); + + char buf[255]; + rc = zmq_send(req, "msg1", 4, 0); + assert (rc == 4); + rc = zmq_recv(req, buf, 255, 0); + assert (rc == 4); + assert (memcmp (buf, "msg1", 4) == 0); + + rc = zmq_send(req, "msg22", 5, 0); + assert (rc == 5); + rc = zmq_recv(req, buf, 255, 0); + assert (rc == 5); + assert (memcmp (buf, "msg22", 5) == 0); + + rc = zmq_send (control, "TERMINATE", 9, 0); + assert (rc == 9); + + rc = zmq_close (control); + assert (rc == 0); + rc = zmq_close (req); + assert (rc == 0); + + zmq_threadclose (server_thread); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + return 0; +}