mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

A clean-room implementation of zmq_proxy_steerable().

It is contriubted under the MPL-2.0.

I had no knowledge of the previous implementation of zmq_proxy_steerable().

This version was developed based on expectations set in the old man page with one exception.  This version uses a REP/REQ for the proxy control protocol sockets.  The old man page example used PUB/SUB which is nonsensical given the STATISTICS command requires two way communication.
This commit is contained in:
Brett Viren 2023-10-08 19:56:03 -04:00
parent 8cdc4ed71a
commit 5712ad5138
8 changed files with 724 additions and 41 deletions

View File

@ -27,6 +27,7 @@ Bernd Prager
Bob Beaty
Brandon Carpenter
Brett Cameron
Brett Viren
Brian Buchanan
Burak Arslan
Carl Clemens

View File

@ -483,6 +483,7 @@ test_apps = \
tests/test_issue_566 \
tests/test_proxy_hwm \
tests/test_proxy_single_socket \
tests/test_proxy_steerable \
tests/test_proxy_terminate \
tests/test_getsockopt_memset \
tests/test_setsockopt \
@ -731,6 +732,10 @@ tests_test_proxy_single_socket_SOURCES = tests/test_proxy_single_socket.cpp
tests_test_proxy_single_socket_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_proxy_single_socket_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_proxy_steerable_SOURCES = tests/test_proxy_steerable.cpp
tests_test_proxy_steerable_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_proxy_steerable_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_proxy_terminate_SOURCES = tests/test_proxy_terminate.cpp
tests_test_proxy_terminate_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_proxy_terminate_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

View File

@ -3,7 +3,7 @@ zmq_proxy_steerable(3)
zmq_proxy_steerable - DEPRECATED
zmq_proxy_steerable - built-in 0MQ proxy with control flow
@ -14,9 +14,102 @@ SYNOPSIS
The _zmq_proxy_steerable()_ function is an empty stub that only returns an
*EOPNOTSUPP* error, as the author did not provide a relicense agreement for
the Mozilla Public License v2 relicense of libzmq.
The _zmq_proxy_steerable()_ function is a variant of the _zmq_proxy()_ function.
It accepts a fourth _control_ socket. When the _control_ socket is _NULL_ the
two functions operate identically.
When a _control_ socket of type _REP_ is provided to the proxy function the
application may send commands to the proxy. The following commands are
The proxy will cease transferring messages between its endpoints.
The proxy will resume transferring messages between its endpoints.
The proxy function will exit with a return value of 0.
The proxy behavior will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next.
There are eight statistics values, each of size _uint64_t_ in the multi-part
message reply to the _STATISTICS_ command. These are:
- number of messages received by the frontend socket
- number of bytes received by the frontend socket
- number of messages sent by the frontend socket
- number of bytes sent by the frontend socket
- number of messages received by the backend socket
- number of bytes received by the backend socket
- number of messages sent by the backend socket
- number of bytes sent by the backend socket
The _zmq_proxy_steerable()_ function returns 0 if TERMINATE is received on its
control socket. Otherwise, it returns -1 and errno set to ETERM or EINTR (the
0MQ context associated with either of the specified sockets was terminated) or
EFAULT (the provided frontend or backend was invalid).
.Create a function to run the proxy
// Create the frontend and backend sockets to be proxied
void *frontend = zmq_socket (context, ZMQ_ROUTER);
void *backend = zmq_socket (context, ZMQ_DEALER);
// Create the proxy control socket
void *control = zmq_socket (context, ZMQ_REP);
// Bind the sockets.
zmq_bind (frontend, "tcp://*:5555");
zmq_bind (backend, "tcp://*:5556");
zmq_bind (control, "tcp://*:5557");
zmq_proxy_steerable(frontend, backend, NULL, control);
.Code in another thread/process to steer the proxy.
void *control = zmq_socket (context, ZMQ_REQ);
zmq_connect (control, "tcp://*:5557");
zmq_msg_t msg;
zmq_send (control, "PAUSE", 5, 0);
zmq_msg_recv (&msg, control, 0));
zmq_send (control, "RESUME", 6, 0);
zmq_msg_recv (&msg, control, 0));
zmq_send (control, "STATISTICS", 10, 0);
while (1) {
zmq_msg_recv (&msg, control, 0));
printf(" %lu", *(uint64_t *)zmq_msg_data (&msg));
if (!zmq_msg_get (&msg, ZMQ_MORE))
zmq_send (control, "TERMINATE", 9, 0);
zmq_msg_recv (&msg, control, 0));

View File

@ -18,6 +18,13 @@
#include "socket_base.hpp"
#include "err.hpp"
int zmq::proxy (class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_)
return zmq::proxy_steerable (frontend_, backend_, capture_, NULL);
#include "socket_poller.hpp"
@ -66,10 +73,25 @@ capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0)
return 0;
struct stats_socket
uint64_t count, bytes;
struct stats_endpoint
stats_socket send, recv;
struct stats_proxy
stats_endpoint frontend, backend;
static int forward (class zmq::socket_base_t *from_,
class zmq::socket_base_t *to_,
class zmq::socket_base_t *capture_,
zmq::msg_t *msg_)
zmq::msg_t *msg_,
stats_socket &recving,
stats_socket &sending)
// Forward a burst of messages
for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) {
@ -86,6 +108,10 @@ static int forward (class zmq::socket_base_t *from_,
return -1;
size_t nbytes = msg_->size ();
recving.count += 1;
recving.bytes += nbytes;
moresz = sizeof more;
rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0))
@ -99,6 +125,8 @@ static int forward (class zmq::socket_base_t *from_,
rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0))
return -1;
sending.count += 1;
sending.bytes += nbytes;
if (more == 0)
@ -108,10 +136,81 @@ static int forward (class zmq::socket_base_t *from_,
return 0;
enum proxy_state_t
// Handle control request [5]PAUSE, [6]RESUME, [9]TERMINATE,
// [10]STATISTICS. Only STATISTICS results in a send.
static int handle_control (class zmq::socket_base_t *control_,
proxy_state_t &state,
const stats_proxy &stats)
zmq::msg_t cmsg;
int rc = cmsg.init ();
if (rc != 0) {
return -1;
rc = control_->recv (&cmsg, ZMQ_DONTWAIT);
if (rc < 0) {
return -1;
uint8_t *const command = static_cast<uint8_t *> (cmsg.data ());
const size_t msiz = cmsg.size ();
if (msiz == 10 && 0 == memcmp (command, "STATISTICS", 10)) {
// The stats are a cross product:
// (Front,Back) X (Recv,Sent) X (Number,Bytes).
// that is flattened into sequence of 8 message parts according to the
// zmq_proxy_steerable(3) documentation as:
// (frn, frb, fsn, fsb, brn, brb, bsn, bsb)
// f=front/b=back, r=recv/s=send, n=number/b=bytes.
const uint64_t stat_vals[8] = {
stats.frontend.recv.count, stats.frontend.recv.bytes,
stats.frontend.send.count, stats.frontend.send.bytes,
stats.backend.recv.count, stats.backend.recv.bytes,
stats.backend.send.count, stats.backend.send.bytes};
for (size_t ind = 0; ind < 8; ++ind) {
cmsg.init_size (sizeof (uint64_t));
memcpy (cmsg.data (), stat_vals + ind, sizeof (uint64_t));
rc = control_->send (&cmsg, ind < 7 ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0)) {
return -1;
return 0;
if (msiz == 5 && memcmp (command, "\x05PAUSE", 6)) {
state = active;
} else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) {
state = paused;
} else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) {
state = terminated;
// satisfy REP duty and reply no matter what.
cmsg.init_size (0);
rc = control_->send (&cmsg, 0);
if (unlikely (rc < 0)) {
return -1;
return 0;
int zmq::proxy (class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_)
int zmq::proxy_steerable (class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_)
msg_t msg;
int rc = msg.init ();
@ -122,19 +221,17 @@ int zmq::proxy (class socket_base_t *frontend_,
// under full load to be 1:1.
// Proxy can be in these three states
} state = active;
proxy_state_t state = active;
bool frontend_equal_to_backend;
bool frontend_in = false;
bool frontend_out = false;
bool backend_in = false;
bool backend_out = false;
zmq::socket_poller_t::event_t events[3];
zmq::socket_poller_t::event_t events[4];
int nevents = 3; // increase to 4 if we have control_
stats_proxy stats = {{{0, 0}, {0, 0}}, {{0, 0}, {0, 0}}};
// Don't allocate these pollers from stack because they will take more than 900 kB of stack!
// On Windows this blows up default stack of 1 MB and aborts the program.
@ -232,25 +329,58 @@ int zmq::proxy (class socket_base_t *frontend_,
bool request_processed, reply_processed;
if (control_) {
// wherever you go, there you are.
rc = poller_all->add (control_, NULL, ZMQ_POLLIN);
rc = poller_in->add (control_, NULL, ZMQ_POLLIN);
rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN);
rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN);
rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN);
rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN);
rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN);
bool request_processed = false, reply_processed = false;
while (state != terminated) {
// Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
// If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
// 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'.
rc = poller_wait->wait (events, 3, -1);
rc = poller_wait->wait (events, nevents, -1);
if (rc < 0 && errno == EAGAIN)
rc = 0;
// Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking.
rc = poller_all->wait (events, 3, 0);
rc = poller_all->wait (events, nevents, 0);
if (rc < 0 && errno == EAGAIN)
rc = 0;
// Process events.
for (int i = 0; i < rc; i++) {
if (control_ && events[i].socket == control_) {
rc = handle_control (control_, state, stats);
if (events[i].socket == frontend_) {
frontend_in = (events[i].events & ZMQ_POLLIN) != 0;
frontend_out = (events[i].events & ZMQ_POLLOUT) != 0;
@ -267,7 +397,8 @@ int zmq::proxy (class socket_base_t *frontend_,
// Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
// In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
if (frontend_in && (backend_out || frontend_equal_to_backend)) {
rc = forward (frontend_, backend_, capture_, &msg);
rc = forward (frontend_, backend_, capture_, &msg,
stats.frontend.recv, stats.backend.send);
request_processed = true;
frontend_in = backend_out = false;
@ -279,7 +410,8 @@ int zmq::proxy (class socket_base_t *frontend_,
// covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
// design in 'for' event processing loop.
if (backend_in && frontend_out) {
rc = forward (backend_, frontend_, capture_, &msg);
rc = forward (backend_, frontend_, capture_, &msg,
stats.backend.recv, stats.frontend.send);
reply_processed = true;
backend_in = frontend_out = false;
@ -346,9 +478,10 @@ int zmq::proxy (class socket_base_t *frontend_,
int zmq::proxy (class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_)
int zmq::proxy_steerable (class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_)
msg_t msg;
int rc = msg.init ();
@ -359,18 +492,17 @@ int zmq::proxy (class socket_base_t *frontend_,
// under full load to be 1:1.
zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0},
{backend_, 0, ZMQ_POLLIN, 0}};
int qt_poll_items = 2;
{backend_, 0, ZMQ_POLLIN, 0},
{control_, 0, ZMQ_POLLIN, 0}};
const int qt_poll_items = control_ ? 3 : 2;
zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0},
{backend_, 0, ZMQ_POLLOUT, 0}};
stats_proxy stats = {0};
// Proxy can be in these three states
} state = active;
proxy_state_t state = active;
while (state != terminated) {
// Wait while there are either requests or replies to process.
@ -378,6 +510,12 @@ int zmq::proxy (class socket_base_t *frontend_,
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
if (control_ && items[2].revents & ZMQ_POLLIN) {
rc = handle_control (control_, state, stats);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
// Get the pollout separately because when combining this with pollin it maxes the CPU
// because pollout shall most of the time return directly.
// POLLOUT is only checked when frontend and backend sockets are not the same.
@ -390,7 +528,8 @@ int zmq::proxy (class socket_base_t *frontend_,
if (state == active && items[0].revents & ZMQ_POLLIN
&& (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) {
rc = forward (frontend_, backend_, capture_, &msg);
rc = forward (frontend_, backend_, capture_, &msg,
stats.frontend.recv, stats.backend.send);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
@ -398,7 +537,8 @@ int zmq::proxy (class socket_base_t *frontend_,
if (state == active && frontend_ != backend_
&& items[1].revents & ZMQ_POLLIN
&& itemsout[0].revents & ZMQ_POLLOUT) {
rc = forward (backend_, frontend_, capture_, &msg);
rc = forward (backend_, frontend_, capture_, &msg,
stats.backend.recv, stats.frontend.send);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);

View File

@ -8,6 +8,11 @@ namespace zmq
int proxy (class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_);
int proxy_steerable (class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_);

View File

@ -1729,6 +1729,7 @@ int zmq_proxy (void *frontend_, void *backend_, void *capture_)
errno = EFAULT;
return -1;
// Runs zmq::proxy_steerable with a NULL control_.
return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
static_cast<zmq::socket_base_t *> (backend_),
static_cast<zmq::socket_base_t *> (capture_));
@ -1739,18 +1740,14 @@ int zmq_proxy_steerable (void *frontend_,
void *capture_,
void *control_)
LIBZMQ_UNUSED (capture_);
LIBZMQ_UNUSED (control_);
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
return -1;
return zmq::proxy_steerable (static_cast<zmq::socket_base_t *> (frontend_),
static_cast<zmq::socket_base_t *> (backend_),
static_cast<zmq::socket_base_t *> (capture_),
static_cast<zmq::socket_base_t *> (control_));
// The deprecated device functionality

View File

@ -115,6 +115,7 @@ if(NOT WIN32)

View File

@ -0,0 +1,441 @@
/* SPDX-License-Identifier: MPL-2.0 */
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <stdlib.h>
#include <string.h>
#define CONTENT_SIZE 13
#define ROUTING_ID_SIZE 10
#define QT_WORKERS 5
#define QT_CLIENTS 3
#define is_verbose 0
const char *proxy_control_address = "inproc://proxy_control";
struct thread_data
int id;
void *g_clients_pkts_out = NULL;
void *g_workers_pkts_out = NULL;
void *control_context = NULL; // worker control, not proxy control
void setUp ()
setup_test_context ();
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
static void client_task (void *db_)
const thread_data *const databag = static_cast<const thread_data *> (db_);
// Endpoint socket gets random port to avoid test failing when port in use
void *endpoint = zmq_socket (get_test_context (), ZMQ_PAIR);
int linger = 0;
zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger)));
char endpoint_source[256];
snprintf (endpoint_source, 256 * sizeof (char), "inproc://endpoint%d",
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (endpoint, endpoint_source));
char *my_endpoint = s_recv (endpoint);
TEST_ASSERT_NOT_NULL (my_endpoint);
void *client = zmq_socket (get_test_context (), ZMQ_DEALER);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (control_context, ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
linger = 0;
zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
char content[CONTENT_SIZE_MAX] = {};
// Set random routing id to make tracing easier
char routing_id[ROUTING_ID_SIZE] = {};
snprintf (routing_id, ROUTING_ID_SIZE * sizeof (char), "%04X-%04X",
rand () % 0xFFFF, rand () % 0xFFFF);
client, ZMQ_ROUTING_ID, routing_id,
ROUTING_ID_SIZE)); // includes '\0' as an helper for printf
linger = 0;
zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0},
{control, 0, ZMQ_POLLIN, 0}};
int request_nbr = 0;
bool run = true;
bool keep_sending = true;
while (run) {
// Tick once per 200 ms, pulling in arriving messages
int centitick;
for (centitick = 0; centitick < 20; centitick++) {
zmq_poll (items, 2, 10);
if (items[0].revents & ZMQ_POLLIN) {
int rcvmore;
size_t sz = sizeof (rcvmore);
zmq_recv (client, content, CONTENT_SIZE_MAX, 0));
if (is_verbose)
printf (
"client receive - routing_id = %s content = %s\n",
routing_id, content);
// Check that message is still the same
TEST_ASSERT_EQUAL_STRING_LEN ("request #", content, 9);
zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz));
if (items[1].revents & ZMQ_POLLIN) {
int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
if (rc > 0) {
content[rc] = 0; // NULL-terminate the command string
if (is_verbose)
printf (
"client receive - routing_id = %s command = %s\n",
routing_id, content);
if (memcmp (content, "TERMINATE", 9) == 0) {
run = false;
if (memcmp (content, "STOP", 4) == 0) {
keep_sending = false;
if (keep_sending) {
snprintf (content, CONTENT_SIZE_MAX * sizeof (char),
"request #%03d", ++request_nbr); // CONTENT_SIZE
if (is_verbose)
printf ("client send - routing_id = %s request #%03d\n",
routing_id, request_nbr);
zmq_atomic_counter_inc (g_clients_pkts_out);
zmq_send (client, content, CONTENT_SIZE, 0));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (client));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint));
free (my_endpoint);
// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.
static void server_worker (void * /*unused_*/);
void server_task (void * /*unused_*/)
// Frontend socket talks to clients over TCP
char my_endpoint[MAX_SOCKET_STRING];
void *frontend = zmq_socket (get_test_context (), ZMQ_ROUTER);
int linger = 0;
zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger)));
bind_loopback_ipv4 (frontend, my_endpoint, sizeof my_endpoint);
// Backend socket talks to workers over inproc
void *backend = zmq_socket (get_test_context (), ZMQ_DEALER);
zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "inproc://backend"));
// Launch pool of worker threads, precise number is not critical
int thread_nbr;
void *threads[5];
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
threads[thread_nbr] = zmq_threadstart (&server_worker, NULL);
// Endpoint socket sends random port to avoid test failing when port in use
void *endpoint_receivers[QT_CLIENTS];
char endpoint_source[256];
for (int i = 0; i < QT_CLIENTS; ++i) {
endpoint_receivers[i] = zmq_socket (get_test_context (), ZMQ_PAIR);
TEST_ASSERT_NOT_NULL (endpoint_receivers[i]);
endpoint_receivers[i], ZMQ_LINGER, &linger, sizeof (linger)));
snprintf (endpoint_source, 256 * sizeof (char), "inproc://endpoint%d",
zmq_bind (endpoint_receivers[i], endpoint_source));
for (int i = 0; i < QT_CLIENTS; ++i) {
send_string_expect_success (endpoint_receivers[i], my_endpoint, 0);
// Proxy control socket
void *proxy_control = zmq_socket (get_test_context (), ZMQ_REP);
TEST_ASSERT_NOT_NULL (proxy_control);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (proxy_control, proxy_control_address));
// Connect backend to frontend via a steerable proxy
int rc = zmq_proxy_steerable (frontend, backend, NULL, proxy_control);
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) {
zmq_threadclose (threads[thread_nbr]);
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (frontend));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (backend));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (proxy_control));
for (int i = 0; i < QT_CLIENTS; ++i) {
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint_receivers[i]));
// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version
static void server_worker (void * /*unused_*/)
void *worker = zmq_socket (get_test_context (), ZMQ_DEALER);
int linger = 0;
zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (worker, "inproc://backend"));
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (control_context, ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
char content[CONTENT_SIZE_MAX] =
{}; // bigger than what we need to check that
char routing_id[ROUTING_ID_SIZE_MAX] =
{}; // the size received is the size sent
bool run = true;
bool keep_sending = true;
while (run) {
int rc = zmq_recv (control, content, CONTENT_SIZE_MAX,
ZMQ_DONTWAIT); // usually, rc == -1 (no message)
if (rc > 0) {
content[rc] = 0; // NULL-terminate the command string
if (is_verbose)
printf ("server_worker receives command = %s\n", content);
if (memcmp (content, "TERMINATE", 9) == 0)
run = false;
if (memcmp (content, "STOP", 4) == 0)
keep_sending = false;
// The DEALER socket gives us the reply envelope and message
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT);
if (rc == ROUTING_ID_SIZE) {
rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
if (is_verbose)
printf ("server receive - routing_id = %s content = %s\n",
routing_id, content);
// Send 0..4 replies back
if (keep_sending) {
int reply, replies = rand () % 5;
for (reply = 0; reply < replies; reply++) {
// Sleep for some fraction of a second
msleep (rand () % 10 + 1);
// Send message from server to client
if (is_verbose)
printf ("server send - routing_id = %s reply\n",
zmq_atomic_counter_inc (g_workers_pkts_out);
rc = zmq_send (worker, routing_id, ROUTING_ID_SIZE,
rc = zmq_send (worker, content, CONTENT_SIZE, 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (worker));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
// If STATISTICS is received, the proxy will reply on the control socket
// sending a multipart message with 8 frames, each with an unsigned integer
// 64-bit wide that provide in the following order:
// - 0/frn: number of messages received by the frontend socket
// - 1/frb: number of bytes received by the frontend socket
// - 2/fsn: number of messages sent out the frontend socket
// - 3/fsb: number of bytes sent out the frontend socket
// - 4/brn: number of messages received by the backend socket
// - 5/brb: number of bytes received by the backend socket
// - 6/bsn: number of messages sent out the backend socket
// - 7/bsb: number of bytes sent out the backend socket
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
void steer (void *proxy_control, const char *command, const char *runctx)
if (is_verbose) {
printf ("steer: sending %s - %s\n", command, runctx);
// Start with proxy paused
zmq_send (proxy_control, command, strlen (command), 0));
zmq_msg_t stats_msg;
int count = -1;
while (1) {
count = count + 1;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0));
if (is_verbose && zmq_msg_size (&stats_msg)) {
if (count == 0) {
printf ("steer:");
printf (" %lu", *(unsigned long int *) zmq_msg_data (&stats_msg));
if (count == 7) {
printf ("\n");
if (!zmq_msg_get (&stats_msg, ZMQ_MORE))
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
void test_proxy_steerable ()
int linger = 0;
void *threads[QT_CLIENTS + 1];
g_clients_pkts_out = zmq_atomic_counter_new ();
g_workers_pkts_out = zmq_atomic_counter_new ();
control_context = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (control_context);
// Worker control socket receives terminate command from main over inproc
void *control = zmq_socket (control_context, ZMQ_PUB);
linger = 0;
zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (control, "inproc://control"));
struct thread_data databags[QT_CLIENTS + 1];
for (int i = 0; i < QT_CLIENTS; i++) {
databags[i].id = i;
threads[i] = zmq_threadstart (&client_task, &databags[i]);
threads[QT_CLIENTS] = zmq_threadstart (&server_task, NULL);
msleep (500); // Run for 500 ms then quit
// Proxy control socket
void *proxy_control = zmq_socket (get_test_context (), ZMQ_REQ);
TEST_ASSERT_NOT_NULL (proxy_control);
linger = 0;
zmq_setsockopt (proxy_control, ZMQ_LINGER, &linger, sizeof (linger)));
zmq_connect (proxy_control, proxy_control_address));
msleep (500); // Run for 500 ms then quit
steer (proxy_control, "STATISTICS", "started clients");
steer (proxy_control, "PAUSE", "started server");
msleep (500); // Run for 500 ms then quit
steer (proxy_control, "RESUME", "started clients");
msleep (500); // Run for 500 ms then quit
steer (proxy_control, "STATISTICS", "ran for a while");
if (is_verbose)
printf ("stopping all clients and server workers\n");
send_string_expect_success (control, "STOP", 0);
steer (proxy_control, "STATISTICS", "stopped clients and workers");
msleep (500); // Wait for all clients and workers to STOP
if (is_verbose)
printf ("shutting down all clients and server workers\n");
send_string_expect_success (control, "TERMINATE", 0);
msleep (500);
steer (proxy_control, "STATISTICS", "terminate clients and server workers");
msleep (500); // Wait for all clients and workers to terminate
steer (proxy_control, "TERMINATE", "terminate proxy");
for (int i = 0; i < QT_CLIENTS + 1; i++)
zmq_threadclose (threads[i]);
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
TEST_ASSERT_SUCCESS_ERRNO (zmq_ctx_destroy (control_context));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (proxy_control));
teardown_test_context ();
int main (void)
setup_test_environment (360);
RUN_TEST (test_proxy_steerable);
return UNITY_END ();