0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

test_proxy_steerable: reduce cpu and thread count

Problem: flakiness on slow CI VMs

Solution: fewer worker threads, poll rather than thrash,
consistent timings. user cpu ~300% -> 15%.
This commit is contained in:
George Cockshull 2023-10-13 06:50:43 -04:00
parent 6b80df14f4
commit 77303a1926

View File

@ -11,9 +11,10 @@
#define CONTENT_SIZE_MAX 32 #define CONTENT_SIZE_MAX 32
#define ROUTING_ID_SIZE 10 #define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE_MAX 32 #define ROUTING_ID_SIZE_MAX 32
#define QT_WORKERS 5 #define QT_WORKERS 3
#define QT_CLIENTS 3 #define QT_CLIENTS 3
#define is_verbose 0 #define is_verbose 0
#define TEST_SLEEP_MS 500
const char *proxy_control_address = "inproc://proxy_control"; const char *proxy_control_address = "inproc://proxy_control";
@ -127,15 +128,12 @@ static void client_task (void *db_)
routing_id, content); routing_id, content);
if (memcmp (content, "TERMINATE", 9) == 0) { if (memcmp (content, "TERMINATE", 9) == 0) {
run = false; run = false;
break; } else if (memcmp (content, "STOP", 4) == 0) {
}
if (memcmp (content, "STOP", 4) == 0) {
enable_send = false; enable_send = false;
break; } else if (memcmp (content, "START", 5) == 0) {
}
if (memcmp (content, "START", 5) == 0) {
enable_send = true; enable_send = true;
} }
break;
} }
} }
} }
@ -187,7 +185,7 @@ void server_task (void * /*unused_*/)
// Launch pool of worker threads, precise number is not critical // Launch pool of worker threads, precise number is not critical
int thread_nbr; int thread_nbr;
void *threads[5]; void *threads[QT_WORKERS];
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
threads[thread_nbr] = zmq_threadstart (&server_worker, NULL); threads[thread_nbr] = zmq_threadstart (&server_worker, NULL);
@ -261,26 +259,34 @@ static void server_worker (void * /*unused_*/)
char routing_id[ROUTING_ID_SIZE_MAX] = char routing_id[ROUTING_ID_SIZE_MAX] =
{}; // the size received is the size sent {}; // the size received is the size sent
bool run = true; zmq_pollitem_t items[] = {{control, 0, ZMQ_POLLIN, 0},
{worker, 0, ZMQ_POLLIN, 0}};
bool keep_sending = true; bool keep_sending = true;
while (run) { while (true) {
int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, zmq_poll (items, 2, 100);
ZMQ_DONTWAIT); // usually, rc == -1 (no message) if (items[0].revents & ZMQ_POLLIN) {
if (rc > 0) { //Commands over the worker control socket
content[rc] = 0; // NULL-terminate the command string int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
if (is_verbose) if (rc > 0) {
printf ("server_worker receives command = %s\n", content); content[rc] = 0; // NULL-terminate the command string
if (memcmp (content, "TERMINATE", 9) == 0) if (is_verbose)
run = false; printf ("server_worker receives command = %s\n", content);
if (memcmp (content, "STOP", 4) == 0) if (memcmp (content, "TERMINATE", 9) == 0)
keep_sending = false; break;
if (memcmp (content, "STOP", 4) == 0)
keep_sending = false;
}
} }
// The DEALER socket gives us the reply envelope and message if (items[1].revents & ZMQ_POLLIN) {
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 // The DEALER socket gives us the reply envelope and message
rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT); int rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, 0);
if (rc == ROUTING_ID_SIZE) { if (rc != ROUTING_ID_SIZE) {
continue;
}
routing_id[rc] = 0; //null terminate
rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0); rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
TEST_ASSERT_EQUAL_INT (CONTENT_SIZE, rc); TEST_ASSERT_EQUAL_INT (CONTENT_SIZE, rc);
content[rc] = 0; //null terminate
if (is_verbose) if (is_verbose)
printf ("server receive - routing_id = %s content = %s\n", printf ("server receive - routing_id = %s content = %s\n",
routing_id, content); routing_id, content);
@ -360,7 +366,7 @@ uint64_t statistics (void *proxy_control, const char *runctx)
zmq_atomic_counter_value (g_clients_pkts_out), zmq_atomic_counter_value (g_clients_pkts_out),
zmq_atomic_counter_value (g_workers_pkts_out)); zmq_atomic_counter_value (g_workers_pkts_out));
} }
printf ("%" PRIu64, val); printf ("%" PRIu64 " ", val);
if (count == 7) { if (count == 7) {
printf ("}\n"); printf ("}\n");
} }
@ -426,7 +432,7 @@ void test_proxy_steerable ()
threads[i] = zmq_threadstart (&client_task, &databags[i]); threads[i] = zmq_threadstart (&client_task, &databags[i]);
} }
threads[QT_CLIENTS] = zmq_threadstart (&server_task, NULL); threads[QT_CLIENTS] = zmq_threadstart (&server_task, NULL);
msleep (500); // Run for 500 ms then quit msleep (TEST_SLEEP_MS); // setup time
// Proxy control socket // Proxy control socket
int control_socktype = ZMQ_PAIR; int control_socktype = ZMQ_PAIR;
@ -453,19 +459,20 @@ void test_proxy_steerable ()
send_string_expect_success (control, "START", 0); send_string_expect_success (control, "START", 0);
msleep (500); // Run for 500 ms then quit msleep (TEST_SLEEP_MS); // Run for some time
TEST_ASSERT (statistics (proxy_control, "started clients") > 0); TEST_ASSERT (statistics (proxy_control, "started clients") > 0);
steer (proxy_control, "PAUSE", "pausing proxying after 500ms"); steer (proxy_control, "PAUSE", "pausing proxying after 500ms");
uint64_t bytes = statistics (proxy_control, "post-pause"); uint64_t bytes = statistics (proxy_control, "post-pause");
msleep (500); // Run for 500 ms then quit msleep (TEST_SLEEP_MS); // Paused for some time
//check no more bytes have been proxied while paused
TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes); TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes);
steer (proxy_control, "RESUME", "resuming proxying after another 500ms"); steer (proxy_control, "RESUME", "resuming proxying after another 500ms");
msleep (500); // Run for 500 ms then quit msleep (TEST_SLEEP_MS); // Resumed for a while
TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes); TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes);
@ -475,16 +482,16 @@ void test_proxy_steerable ()
statistics (proxy_control, "stopped clients and workers"); statistics (proxy_control, "stopped clients and workers");
msleep (500); // Wait for all clients and workers to STOP msleep (TEST_SLEEP_MS); // Wait for all clients and workers to STOP
if (is_verbose) if (is_verbose)
printf ("shutting down all clients and server workers\n"); printf ("shutting down all clients and server workers\n");
send_string_expect_success (control, "TERMINATE", 0); send_string_expect_success (control, "TERMINATE", 0);
msleep (500); msleep (TEST_SLEEP_MS);
statistics (proxy_control, "terminate clients and server workers"); statistics (proxy_control, "terminate clients and server workers");
msleep (500); // Wait for all clients and workers to terminate msleep (TEST_SLEEP_MS); // Wait for all clients and workers to terminate
steer (proxy_control, "TERMINATE", "terminate proxy"); steer (proxy_control, "TERMINATE", "terminate proxy");
for (int i = 0; i < QT_CLIENTS + 1; i++) for (int i = 0; i < QT_CLIENTS + 1; i++)