0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-25 22:30:51 +08:00

Fix zmq_proxy_steerable PAUSE/RESUME

Problem: the new reimplementation of zmq_proxy_steerable had PAUSE/RESUME
that didn't follow expected behaviour. Possibly mixed up. Test didn't properly
cover the issue.

Solution: improve test coverage, fix the proxy command parsing.

I had no knowledge of the pre-MPL-2.0 implementation. This fix is based
on documented semantics and prior API usage. I contribute this
under MPL-2.0.
This commit is contained in:
George Cockshull 2023-10-12 11:16:48 -04:00
parent 9d31965548
commit f8b3cc8108
2 changed files with 101 additions and 34 deletions

View File

@ -189,10 +189,10 @@ static int handle_control (class zmq::socket_base_t *control_,
return 0;
}
if (msiz == 5 && memcmp (command, "\x05PAUSE", 6)) {
state = active;
} else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) {
if (msiz == 5 && 0 == memcmp (command, "PAUSE", 5)) {
state = paused;
} else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) {
state = active;
} else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) {
state = terminated;
}

View File

@ -5,6 +5,7 @@
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
@ -25,6 +26,9 @@ void *g_clients_pkts_out = NULL;
void *g_workers_pkts_out = NULL;
void *control_context = NULL; // worker control, not proxy control
int g_proxy_control_socktype =
ZMQ_PAIR; //or ZMQ_PAIR, ZMQ_SUB (without statistics)
void setUp ()
{
setup_test_context ();
@ -90,7 +94,7 @@ static void client_task (void *db_)
int request_nbr = 0;
bool run = true;
bool keep_sending = true;
bool enable_send = false;
while (run) {
// Tick once per 200 ms, pulling in arriving messages
int centitick;
@ -126,14 +130,17 @@ static void client_task (void *db_)
break;
}
if (memcmp (content, "STOP", 4) == 0) {
keep_sending = false;
enable_send = false;
break;
}
if (memcmp (content, "START", 5) == 0) {
enable_send = true;
}
}
}
}
if (keep_sending) {
if (enable_send) {
snprintf (content, CONTENT_SIZE_MAX * sizeof (char),
"request #%03d", ++request_nbr); // CONTENT_SIZE
if (is_verbose)
@ -203,9 +210,14 @@ void server_task (void * /*unused_*/)
}
// Proxy control socket
void *proxy_control = zmq_socket (get_test_context (), ZMQ_REP);
void *proxy_control =
zmq_socket (get_test_context (), g_proxy_control_socktype);
TEST_ASSERT_NOT_NULL (proxy_control);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (proxy_control, proxy_control_address));
if (g_proxy_control_socktype == ZMQ_SUB) {
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (proxy_control, ZMQ_SUBSCRIBE, "", 0));
}
// Connect backend to frontend via a steerable proxy
int rc = zmq_proxy_steerable (frontend, backend, NULL, proxy_control);
@ -319,6 +331,55 @@ static void server_worker (void * /*unused_*/)
//
// - 7/bsb: number of bytes sent out the backend socket
uint64_t read_stat_value (void *proxy_control)
{
zmq_msg_t stats_msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0));
TEST_ASSERT_EQUAL_INT (sizeof (uint64_t), zmq_msg_size (&stats_msg));
uint64_t val = *(uint64_t *) zmq_msg_data (&stats_msg);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
return val;
}
//return total bytes proxied, so we can test PAUSE/RESUME
uint64_t statistics (void *proxy_control, const char *runctx)
{
if (is_verbose) {
printf ("steer: sending STATISTICS - %s\n", runctx);
}
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (proxy_control, "STATISTICS", 10, 0));
uint64_t total_bytes_proxied = 0;
for (int count = 0; count < 8; ++count) {
uint64_t val = read_stat_value (proxy_control);
if (is_verbose) {
if (count == 0) {
printf ("stats: client pkts out: %d worker pkts out: %d { ",
zmq_atomic_counter_value (g_clients_pkts_out),
zmq_atomic_counter_value (g_workers_pkts_out));
}
printf ("%" PRIu64, val);
if (count == 7) {
printf ("}\n");
}
}
switch (count) {
case 3: //bytes sent on frontend
case 7: //bytes sent on backend
total_bytes_proxied += val;
}
}
int rcvmore;
size_t sz = sizeof (rcvmore);
zmq_getsockopt (proxy_control, ZMQ_RCVMORE, &rcvmore, &sz);
TEST_ASSERT_EQUAL_INT (rcvmore, 0);
return total_bytes_proxied;
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
@ -328,32 +389,18 @@ void steer (void *proxy_control, const char *command, const char *runctx)
printf ("steer: sending %s - %s\n", command, runctx);
}
// Start with proxy paused
TEST_ASSERT_SUCCESS_ERRNO (
zmq_send (proxy_control, command, strlen (command), 0));
zmq_msg_t stats_msg;
int count = -1;
while (1) {
count = count + 1;
if (g_proxy_control_socktype == ZMQ_REP) {
//expect an empty reply from REP for commands that need no response
zmq_msg_t stats_msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0));
if (is_verbose && zmq_msg_size (&stats_msg)) {
if (count == 0) {
printf ("steer:");
}
printf (" %lu", *(unsigned long int *) zmq_msg_data (&stats_msg));
if (count == 7) {
printf ("\n");
}
}
if (!zmq_msg_get (&stats_msg, ZMQ_MORE))
break;
TEST_ASSERT_EQUAL_INT (zmq_msg_size (&stats_msg), 0);
TEST_ASSERT (!zmq_msg_get (&stats_msg, ZMQ_MORE));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
}
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
}
void test_proxy_steerable ()
@ -382,31 +429,51 @@ void test_proxy_steerable ()
msleep (500); // Run for 500 ms then quit
// Proxy control socket
void *proxy_control = zmq_socket (get_test_context (), ZMQ_REQ);
int control_socktype = ZMQ_PAIR;
switch (g_proxy_control_socktype) {
case ZMQ_REP:
control_socktype = ZMQ_REQ;
break;
case ZMQ_SUB:
control_socktype = ZMQ_PUB;
break;
default:
break;
}
void *proxy_control = zmq_socket (get_test_context (), control_socktype);
TEST_ASSERT_NOT_NULL (proxy_control);
linger = 0;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (proxy_control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (proxy_control, proxy_control_address));
msleep (500); // Run for 500 ms then quit
steer (proxy_control, "STATISTICS", "started clients");
steer (proxy_control, "PAUSE", "started server");
TEST_ASSERT (
statistics (proxy_control, "should be all 0s before clients start") == 0);
send_string_expect_success (control, "START", 0);
msleep (500); // Run for 500 ms then quit
steer (proxy_control, "RESUME", "started clients");
TEST_ASSERT (statistics (proxy_control, "started clients") > 0);
steer (proxy_control, "PAUSE", "pausing proxying after 500ms");
uint64_t bytes = statistics (proxy_control, "post-pause");
msleep (500); // Run for 500 ms then quit
steer (proxy_control, "STATISTICS", "ran for a while");
TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes);
steer (proxy_control, "RESUME", "resuming proxying after another 500ms");
msleep (500); // Run for 500 ms then quit
TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes);
if (is_verbose)
printf ("stopping all clients and server workers\n");
send_string_expect_success (control, "STOP", 0);
steer (proxy_control, "STATISTICS", "stopped clients and workers");
statistics (proxy_control, "stopped clients and workers");
msleep (500); // Wait for all clients and workers to STOP
@ -415,7 +482,7 @@ void test_proxy_steerable ()
send_string_expect_success (control, "TERMINATE", 0);
msleep (500);
steer (proxy_control, "STATISTICS", "terminate clients and server workers");
statistics (proxy_control, "terminate clients and server workers");
msleep (500); // Wait for all clients and workers to terminate
steer (proxy_control, "TERMINATE", "terminate proxy");