diff --git a/.gitignore b/.gitignore index 03284747..40ef8c05 100644 --- a/.gitignore +++ b/.gitignore @@ -148,6 +148,7 @@ test_security_zap test_socket_null test_xpub_verbose test_mock_pub_sub +test_proxy_hwm unittest_ip_resolver unittest_mtrie unittest_poller diff --git a/Makefile.am b/Makefile.am index 953a3168..18f8ab9c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -422,6 +422,7 @@ test_apps = \ tests/test_inproc_connect \ tests/test_issue_566 \ tests/test_proxy \ + tests/test_proxy_hwm \ tests/test_proxy_single_socket \ tests/test_proxy_terminate \ tests/test_getsockopt_memset \ @@ -628,6 +629,10 @@ tests_test_issue_566_LDADD = src/libzmq.la tests_test_proxy_SOURCES = tests/test_proxy.cpp tests_test_proxy_LDADD = src/libzmq.la +tests_test_proxy_hwm_SOURCES = tests/test_proxy_hwm.cpp +tests_test_proxy_hwm_LDADD = src/libzmq.la ${UNITY_LIBS} +tests_test_proxy_hwm_CPPFLAGS = ${UNITY_CPPFLAGS} + tests_test_proxy_single_socket_SOURCES = tests/test_proxy_single_socket.cpp tests_test_proxy_single_socket_LDADD = src/libzmq.la diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index b866b41b..63239002 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -640,6 +640,10 @@ blocking or dropping sent messages. Refer to the individual socket descriptions in linkzmq:zmq_socket[3] for details on the exact action taken for each socket type. +NOTE: 0MQ does not guarantee that the socket will be able to queue as many as ZMQ_RCVHWM +messages, and the actual limit may be lower or higher, depending on socket transport. +A notable example is for sockets using TCP transport; see linkzmq:zmq_tcp[7]. + [horizontal] Option value type:: int Option value unit:: messages @@ -858,7 +862,9 @@ type. NOTE: 0MQ does not guarantee that the socket will accept as many as ZMQ_SNDHWM messages, and the actual limit may be as much as 90% lower depending on the -flow of messages on the socket. +flow of messages on the socket. The socket may even be able to accept more messages +than the ZMQ_SNDHWM threshold; a notable example is for sockets using TCP transport; +see linkzmq:zmq_tcp[7]. [horizontal] Option value type:: int diff --git a/doc/zmq_tcp.txt b/doc/zmq_tcp.txt index 79ea508d..11088a72 100644 --- a/doc/zmq_tcp.txt +++ b/doc/zmq_tcp.txt @@ -69,6 +69,30 @@ A 'peer address' may be specified by either of the following: Note: A description of the ZeroMQ Message Transport Protocol (ZMTP) which is used by the TCP transport can be found at + +HWM +--- + +For the TCP transport, the high water mark (HWM) mechanism works in conjunction +with the TCP socket buffers handled at OS level. +Depending on the OS and several other factors the size of such TCP buffers will +be different. Moreover TCP buffers provided by the OS will accomodate a varying +number of messages depending on the size of messages (unlike ZMQ HWM settings +the TCP socket buffers are measured in bytes and not messages). + +This may result in apparently inexplicable behaviors: e.g., you may expect that +setting ZMQ_SNDHWM to 100 on a socket using TCP transport will have the effect +of blocking the transmission of the 101-th message if the receiver is slow. +This is very unlikely when using TCP transport since OS TCP buffers will typically +provide enough buffering to allow you sending much more than 100 messages. + +Of course if the receiver is slow, transmitting on a TCP ZMQ socket will eventually trigger +the "mute state" of the socket; simply don't rely on the exact HWM value. + +Obviously the same considerations apply for the receive HWM (see ZMQ_RCVHWM). + + + EXAMPLES -------- .Assigning a local address to a socket diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 3469a648..2ba116cc 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -86,6 +86,7 @@ if(NOT WIN32) test_rebind_ipc test_reqrep_ipc test_proxy + test_proxy_hwm test_proxy_single_socket test_proxy_terminate test_getsockopt_memset diff --git a/tests/test_hwm_pubsub.cpp b/tests/test_hwm_pubsub.cpp index 0283f9c9..03b4d983 100644 --- a/tests/test_hwm_pubsub.cpp +++ b/tests/test_hwm_pubsub.cpp @@ -30,6 +30,8 @@ #include "testutil.hpp" #include "testutil_unity.hpp" +#define SOCKET_STRING_LEN (MAX_SOCKET_STRING * 4) + void setUp () { setup_test_context (); @@ -40,17 +42,20 @@ void tearDown () teardown_test_context (); } -// const int MAX_SENDS = 10000; - -int test_defaults (int send_hwm_, int msg_cnt_) +int test_defaults (int send_hwm_, int msg_cnt_, const char *endpoint) { - // Set up bind socket - void *pub_socket = test_context_socket (ZMQ_PUB); - TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, "inproc://a")); + size_t len = SOCKET_STRING_LEN; + char pub_endpoint[SOCKET_STRING_LEN]; - // Set up connect socket + // Set up and bind PUB socket + void *pub_socket = test_context_socket (ZMQ_PUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, endpoint)); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (pub_socket, ZMQ_LAST_ENDPOINT, pub_endpoint, &len)); + + // Set up and connect SUB socket void *sub_socket = test_context_socket (ZMQ_SUB); - TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, "inproc://a")); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint)); //set a hwm on publisher TEST_ASSERT_SUCCESS_ERRNO ( @@ -58,17 +63,22 @@ int test_defaults (int send_hwm_, int msg_cnt_) TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0)); - // Send until we block + msleep ( + SETTLE_TIME); // give some time to background threads to perform PUB-SUB connection + + // Send until we reach "mute" state int send_count = 0; while (send_count < msg_cnt_ - && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0) + && zmq_send (pub_socket, "test message", 13, ZMQ_DONTWAIT) == 13) ++send_count; + TEST_ASSERT_EQUAL_INT (send_hwm_, send_count); msleep (SETTLE_TIME); // Now receive all sent messages int recv_count = 0; - while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) { + char dummybuff[64]; + while (13 == zmq_recv (sub_socket, &dummybuff, 64, ZMQ_DONTWAIT)) { ++recv_count; } @@ -85,23 +95,27 @@ int receive (void *socket_) { int recv_count = 0; // Now receive all sent messages - while (0 == zmq_recv (socket_, NULL, 0, ZMQ_DONTWAIT)) { + while (0 == zmq_recv (socket_, NULL, 0, 0)) { ++recv_count; } return recv_count; } - -int test_blocking (int send_hwm_, int msg_cnt_) +int test_blocking (int send_hwm_, int msg_cnt_, const char *endpoint) { + size_t len = SOCKET_STRING_LEN; + char pub_endpoint[SOCKET_STRING_LEN]; + // Set up bind socket void *pub_socket = test_context_socket (ZMQ_PUB); - TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, "inproc://a")); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, endpoint)); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (pub_socket, ZMQ_LAST_ENDPOINT, pub_endpoint, &len)); // Set up connect socket void *sub_socket = test_context_socket (ZMQ_SUB); - TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, "inproc://a")); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint)); //set a hwm on publisher TEST_ASSERT_SUCCESS_ERRNO ( @@ -109,9 +123,14 @@ int test_blocking (int send_hwm_, int msg_cnt_) int wait = 1; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait))); + int timeout_ms = 10; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + sub_socket, ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms))); TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0)); + msleep (SETTLE_TIME); + // Send until we block int send_count = 0; int recv_count = 0; @@ -120,13 +139,15 @@ int test_blocking (int send_hwm_, int msg_cnt_) if (rc == 0) { ++send_count; } else if (-1 == rc) { + // if the PUB socket blocks due to HWM, errno should be EAGAIN: TEST_ASSERT_EQUAL_INT (EAGAIN, errno); recv_count += receive (sub_socket); - TEST_ASSERT_EQUAL_INT (send_count, recv_count); } } + msleep (2 * SETTLE_TIME); // required for TCP transport recv_count += receive (sub_socket); + TEST_ASSERT_EQUAL_INT (send_count, recv_count); // Clean up test_context_socket_close (sub_socket); @@ -142,7 +163,7 @@ void test_reset_hwm () const int first_count = 9999; const int second_count = 1100; int hwm = 11024; - char my_endpoint[MAX_SOCKET_STRING]; + char my_endpoint[SOCKET_STRING_LEN]; // Set up bind socket void *pub_socket = test_context_socket (ZMQ_PUB); @@ -199,25 +220,51 @@ void test_reset_hwm () test_context_socket_close (pub_socket); } -void test_defaults_1000 () +void test_tcp () { - // send 1000 msg on hwm 1000, receive 1000 - TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000)); + // send 1000 msg on hwm 1000, receive 1000, on TCP transport + TEST_ASSERT_EQUAL_INT (1000, + test_defaults (1000, 1000, "tcp://127.0.0.1:*")); + + // send 100 msg on hwm 100, receive 100 + TEST_ASSERT_EQUAL_INT (100, test_defaults (100, 100, "tcp://127.0.0.1:*")); + + // send 6000 msg on hwm 2000, drops above hwm, only receive hwm: + TEST_ASSERT_EQUAL_INT (6000, + test_blocking (2000, 6000, "tcp://127.0.0.1:*")); } -void test_blocking_2000 () +void test_inproc () { - // send 6000 msg on hwm 2000, drops above hwm, only receive hwm - TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000)); + TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000, "inproc://a")); + TEST_ASSERT_EQUAL_INT (100, test_defaults (100, 100, "inproc://b")); + TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000, "inproc://c")); } +#ifndef ZMQ_HAVE_WINDOWS + +void test_ipc () +{ + TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000, "ipc://*")); + TEST_ASSERT_EQUAL_INT (100, test_defaults (100, 100, "ipc://*")); + TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000, "ipc://*")); +} + +#endif + int main () { setup_test_environment (); UNITY_BEGIN (); - RUN_TEST (test_defaults_1000); - RUN_TEST (test_blocking_2000); + + // repeat the test for both TCP, INPROC and IPC transports: + + RUN_TEST (test_tcp); + RUN_TEST (test_inproc); +#ifndef ZMQ_HAVE_WINDOWS + RUN_TEST (test_ipc); +#endif RUN_TEST (test_reset_hwm); return UNITY_END (); } diff --git a/tests/test_proxy_hwm.cpp b/tests/test_proxy_hwm.cpp new file mode 100644 index 00000000..60533c86 --- /dev/null +++ b/tests/test_proxy_hwm.cpp @@ -0,0 +1,426 @@ +/* + Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" +#include + +// +// Asynchronous proxy test using ZMQ_XPUB_NODROP and HWM: +// +// Topology: +// +// PUB SUB +// | | +// \-----> XSUB -> XPUB -----/ +// ^^^^^^^^^^^^^^ +// ZMQ proxy +// +// All connections use "inproc" transport and have artificially-low HWMs set. +// Then the PUB socket starts flooding the Proxy. The SUB is artificially slow +// at receiving messages. +// This scenario simulates what happens when a SUB is slower than +// its PUB: since ZMQ_XPUB_NODROP=1, the XPUB will block and then +// also the PUB socket will block. +// The result is that 2*HWM messages will be sent before the PUB blocks. +// +// In the meanwhile asking statistics to the Proxy must NOT be blocking. +// + + +#define HWM 10 +#define NUM_BYTES_PER_MSG 50000 + +typedef struct +{ + void *context; + const char *frontend_endpoint; + const char *backend_endpoint; + const char *control_endpoint; + + bool subscriber_received_all; +} proxy_hwm_cfg_t; + +static void lower_tcp_buff (void *sock_) +{ + int sndBuff; + size_t sndBuffSz = sizeof sndBuff; + int rc = zmq_getsockopt (sock_, ZMQ_SNDBUF, &sndBuff, &sndBuffSz); + assert (rc == 0); + + int newBuff = 1000; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sock_, ZMQ_SNDBUF, &newBuff, sizeof (newBuff))); + + rc = zmq_getsockopt (sock_, ZMQ_SNDBUF, &sndBuff, &sndBuffSz); + assert (rc == 0); +} + +static void lower_hwm (void *skt) +{ + int send_hwm_ = HWM; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (skt, ZMQ_SNDHWM, &send_hwm_, sizeof (send_hwm_))); + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (skt, ZMQ_RCVHWM, &send_hwm_, sizeof (send_hwm_))); +} + +static void publisher_thread_main (void *pvoid) +{ + proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; + + void *pubsocket = zmq_socket (cfg->context, ZMQ_PUB); + assert (pubsocket); + + lower_hwm (pubsocket); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pubsocket, cfg->frontend_endpoint)); + + int optval = 1; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval))); + + msleep (SETTLE_TIME); + + uint64_t send_count = 0; + while (true) { + zmq_msg_t msg; + int rc = zmq_msg_init_size (&msg, NUM_BYTES_PER_MSG); + assert (rc == 0); + + /* Fill in message content with 'AAAAAA' */ + memset (zmq_msg_data (&msg), 'A', NUM_BYTES_PER_MSG); + + /* Send the message to the socket */ + rc = zmq_msg_send (&msg, pubsocket, ZMQ_DONTWAIT); + if (rc != -1) { + send_count++; + } else { + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + break; + } + } + + // VERIFY EXPECTED RESULTS + + TEST_ASSERT (4 * HWM == send_count || 2 * HWM == send_count); + + + // CLEANUP + + zmq_close (pubsocket); +} + +static void subscriber_thread_main (void *pvoid) +{ + proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; + + void *subsocket = zmq_socket (cfg->context, ZMQ_SUB); + assert (subsocket); + + lower_hwm (subsocket); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0)); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subsocket, cfg->backend_endpoint)); + + lower_tcp_buff (subsocket); + + // receive all sent messages + uint64_t rxsuccess = 0; + bool success = true; + while (success) { + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + assert (rc == 0); + + rc = zmq_msg_recv (&msg, subsocket, 0); + if (rc != -1) { + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + rxsuccess++; + + // after receiving 1st message, set a finite timeout (default is infinite) + int timeout_ms = 100; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + subsocket, ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms))); + } else { + break; + } + + msleep (100); + } + + + // VERIFY EXPECTED RESULTS + + TEST_ASSERT (4 * HWM == rxsuccess || 2 * HWM == rxsuccess); + + // INFORM THAT WE COMPLETED: + + cfg->subscriber_received_all = true; + + + // CLEANUP + + zmq_close (subsocket); +} + +bool recv_stat (void *sock_, bool last_, uint64_t *res) +{ + zmq_msg_t stats_msg; + + int rc = zmq_msg_init (&stats_msg); + assert (rc == 0); + + rc = zmq_msg_recv (&stats_msg, sock_, 0); //ZMQ_DONTWAIT); + if (rc == -1 && errno == EAGAIN) { + rc = zmq_msg_close (&stats_msg); + assert (rc == 0); + return false; // cannot retrieve the stat + } + + assert (rc == sizeof (uint64_t)); + memcpy (res, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg)); + + rc = zmq_msg_close (&stats_msg); + assert (rc == 0); + + int more; + size_t moresz = sizeof more; + rc = zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz); + assert (rc == 0); + assert ((last_ && !more) || (!last_ && more)); + + return true; +} + +// Utility function to interrogate the proxy: + +typedef struct +{ + uint64_t msg_in; + uint64_t bytes_in; + uint64_t msg_out; + uint64_t bytes_out; +} zmq_socket_stats_t; + +typedef struct +{ + zmq_socket_stats_t frontend; + zmq_socket_stats_t backend; +} zmq_proxy_stats_t; + +bool check_proxy_stats (void *control_proxy_) +{ + zmq_proxy_stats_t total_stats; + int rc; + + rc = zmq_send (control_proxy_, "STATISTICS", 10, ZMQ_DONTWAIT); + assert (rc == 10 || (rc == -1 && errno == EAGAIN)); + if (rc == -1 && errno == EAGAIN) { + return false; + } + + // first frame of the reply contains FRONTEND stats: + if (!recv_stat (control_proxy_, false, &total_stats.frontend.msg_in)) { + return false; + } + + recv_stat (control_proxy_, false, &total_stats.frontend.bytes_in); + recv_stat (control_proxy_, false, &total_stats.frontend.msg_out); + recv_stat (control_proxy_, false, &total_stats.frontend.bytes_out); + + // second frame of the reply contains BACKEND stats: + recv_stat (control_proxy_, false, &total_stats.backend.msg_in); + recv_stat (control_proxy_, false, &total_stats.backend.bytes_in); + recv_stat (control_proxy_, false, &total_stats.backend.msg_out); + recv_stat (control_proxy_, true, &total_stats.backend.bytes_out); + + return true; +} + +static void proxy_stats_asker_thread_main (void *pvoid) +{ + proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; + + + // CONTROL REQ + + void *control_req = + zmq_socket (cfg->context, + ZMQ_REQ); // this one can be used to send command to the proxy + assert (control_req); + + // connect CONTROL-REQ: a socket to which send commands + int rc = zmq_connect (control_req, cfg->control_endpoint); + assert (rc == 0); + + + // IMPORTANT: by setting the tx/rx timeouts, we avoid getting blocked when interrogating a proxy which is + // itself blocked in a zmq_msg_send() on its XPUB socket having ZMQ_XPUB_NODROP=1! + + int optval = 10; + rc = zmq_setsockopt (control_req, ZMQ_SNDTIMEO, &optval, sizeof (optval)); + assert (rc == 0); + rc = zmq_setsockopt (control_req, ZMQ_RCVTIMEO, &optval, sizeof (optval)); + assert (rc == 0); + + optval = 10; + rc = + zmq_setsockopt (control_req, ZMQ_REQ_CORRELATE, &optval, sizeof (optval)); + assert (rc == 0); + + rc = + zmq_setsockopt (control_req, ZMQ_REQ_RELAXED, &optval, sizeof (optval)); + assert (rc == 0); + + + // Start! + + while (!cfg->subscriber_received_all) { +#ifdef ZMQ_BUILD_DRAFT_API + check_proxy_stats (control_req); +#endif + usleep (1000); // 1ms -> in best case we will get 1000updates/second + } + + + // Ask the proxy to exit: the subscriber has received all messages + + rc = zmq_send (control_req, "TERMINATE", 9, 0); + assert (rc == 9); + + zmq_close (control_req); +} + +static void proxy_thread_main (void *pvoid) +{ + proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; + int rc; + + + // FRONTEND SUB + + void *frontend_xsub = zmq_socket ( + cfg->context, + ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC) + assert (frontend_xsub); + + lower_hwm (frontend_xsub); + + // bind FRONTEND + rc = zmq_bind (frontend_xsub, cfg->frontend_endpoint); + assert (rc == 0); + + + // BACKEND PUB + + void *backend_xpub = zmq_socket ( + cfg->context, + ZMQ_XPUB); // the backend is the one exposed to the external world (TCP) + assert (backend_xpub); + + int optval = 1; + rc = + zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval)); + assert (rc == 0); + + lower_hwm (backend_xpub); + + // bind BACKEND + rc = zmq_bind (backend_xpub, cfg->backend_endpoint); + assert (rc == 0); + + + // CONTROL REP + + void *control_rep = zmq_socket ( + cfg->context, + ZMQ_REP); // this one is used by the proxy to receive&reply to commands + assert (control_rep); + + // bind CONTROL + rc = zmq_bind (control_rep, cfg->control_endpoint); + assert (rc == 0); + + + // start proxying! + + zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep); + + zmq_close (frontend_xsub); + zmq_close (backend_xpub); + zmq_close (control_rep); +} + + +// The main thread simply starts several clients and a server, and then +// waits for the server to finish. + +int main (void) +{ + setup_test_environment (); + + void *context = zmq_ctx_new (); + assert (context); + + + // START ALL SECONDARY THREADS + + proxy_hwm_cfg_t cfg; + cfg.context = context; + cfg.frontend_endpoint = "inproc://frontend"; + cfg.backend_endpoint = "inproc://backend"; + cfg.control_endpoint = "inproc://ctrl"; + cfg.subscriber_received_all = false; + + void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg); + assert (proxy != 0); + void *publisher = zmq_threadstart (&publisher_thread_main, (void *) &cfg); + assert (publisher != 0); + void *subscriber = zmq_threadstart (&subscriber_thread_main, (void *) &cfg); + assert (subscriber != 0); + void *asker = + zmq_threadstart (&proxy_stats_asker_thread_main, (void *) &cfg); + assert (asker != 0); + + + // CLEANUP + + zmq_threadclose (publisher); + zmq_threadclose (subscriber); + zmq_threadclose (asker); + zmq_threadclose (proxy); + + int rc = zmq_ctx_term (context); + assert (rc == 0); + + return 0; +}