mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-17 16:43:58 +00:00
Improve test reliability by: a) using XPUB in place of PUB to ensure we start publishing only after 1st subscriber has joined; b) accept both 2*HWM, 3*HWM and 4*HWM as TX/RX count of messages
This commit is contained in:
parent
b933cb9d6a
commit
72418e9dd0
@ -36,7 +36,7 @@
|
|||||||
//
|
//
|
||||||
// Topology:
|
// Topology:
|
||||||
//
|
//
|
||||||
// PUB SUB
|
// XPUB SUB
|
||||||
// | |
|
// | |
|
||||||
// \-----> XSUB -> XPUB -----/
|
// \-----> XSUB -> XPUB -----/
|
||||||
// ^^^^^^^^^^^^^^
|
// ^^^^^^^^^^^^^^
|
||||||
@ -46,10 +46,10 @@
|
|||||||
// Then the PUB socket starts flooding the Proxy. The SUB is artificially slow
|
// Then the PUB socket starts flooding the Proxy. The SUB is artificially slow
|
||||||
// at receiving messages.
|
// at receiving messages.
|
||||||
// This scenario simulates what happens when a SUB is slower than
|
// This scenario simulates what happens when a SUB is slower than
|
||||||
// its PUB: since ZMQ_XPUB_NODROP=1, the XPUB will block and then
|
// its (X)PUB: since ZMQ_XPUB_NODROP=1, the XPUB will block and then
|
||||||
// also the PUB socket will block.
|
// also the (X)PUB socket will block.
|
||||||
// The result is that 2*HWM messages will be sent before the PUB blocks.
|
// The exact number of the messages that go through before (X)PUB blocks depends
|
||||||
//
|
// on ZeroMQ internals and how the OS will schedule the different threads.
|
||||||
// In the meanwhile asking statistics to the Proxy must NOT be blocking.
|
// In the meanwhile asking statistics to the Proxy must NOT be blocking.
|
||||||
//
|
//
|
||||||
|
|
||||||
@ -57,6 +57,7 @@
|
|||||||
#define HWM 10
|
#define HWM 10
|
||||||
#define NUM_BYTES_PER_MSG 50000
|
#define NUM_BYTES_PER_MSG 50000
|
||||||
|
|
||||||
|
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
void *context;
|
void *context;
|
||||||
@ -67,21 +68,6 @@ typedef struct
|
|||||||
bool subscriber_received_all;
|
bool subscriber_received_all;
|
||||||
} proxy_hwm_cfg_t;
|
} proxy_hwm_cfg_t;
|
||||||
|
|
||||||
static void lower_tcp_buff (void *sock_)
|
|
||||||
{
|
|
||||||
int sndBuff;
|
|
||||||
size_t sndBuffSz = sizeof sndBuff;
|
|
||||||
int rc = zmq_getsockopt (sock_, ZMQ_SNDBUF, &sndBuff, &sndBuffSz);
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
int newBuff = 1000;
|
|
||||||
TEST_ASSERT_SUCCESS_ERRNO (
|
|
||||||
zmq_setsockopt (sock_, ZMQ_SNDBUF, &newBuff, sizeof (newBuff)));
|
|
||||||
|
|
||||||
rc = zmq_getsockopt (sock_, ZMQ_SNDBUF, &sndBuff, &sndBuffSz);
|
|
||||||
assert (rc == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void lower_hwm (void *skt)
|
static void lower_hwm (void *skt)
|
||||||
{
|
{
|
||||||
int send_hwm_ = HWM;
|
int send_hwm_ = HWM;
|
||||||
@ -96,7 +82,7 @@ static void publisher_thread_main (void *pvoid)
|
|||||||
{
|
{
|
||||||
proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
|
proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
|
||||||
|
|
||||||
void *pubsocket = zmq_socket (cfg->context, ZMQ_PUB);
|
void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
|
||||||
assert (pubsocket);
|
assert (pubsocket);
|
||||||
|
|
||||||
lower_hwm (pubsocket);
|
lower_hwm (pubsocket);
|
||||||
@ -107,7 +93,10 @@ static void publisher_thread_main (void *pvoid)
|
|||||||
TEST_ASSERT_SUCCESS_ERRNO (
|
TEST_ASSERT_SUCCESS_ERRNO (
|
||||||
zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
|
zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
|
||||||
|
|
||||||
msleep (SETTLE_TIME);
|
// Wait before starting TX operations till 1 subscriber has subscribed
|
||||||
|
// (in this test there's 1 subscriber only)
|
||||||
|
const char subscription_to_all_topics[] = {1, 0};
|
||||||
|
recv_string_expect_success (pubsocket, subscription_to_all_topics, 0);
|
||||||
|
|
||||||
uint64_t send_count = 0;
|
uint64_t send_count = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -129,8 +118,16 @@ static void publisher_thread_main (void *pvoid)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// VERIFY EXPECTED RESULTS
|
// VERIFY EXPECTED RESULTS
|
||||||
|
// EXPLANATION FOR TX TO BE CONSIDERED SUCCESSFUL:
|
||||||
TEST_ASSERT (4 * HWM == send_count || 2 * HWM == send_count);
|
// this test has 3 threads doing I/O across 2 queues. Depending on the scheduling,
|
||||||
|
// it might happen that 20, 30 or 40 messages go through before the pub blocks.
|
||||||
|
// That's because the receiver thread gets kicked once every (hwm_ + 1) / 2 sent
|
||||||
|
// messages (search for zeromq sources compute_lwm function).
|
||||||
|
// So depending on the scheduling of the second thread, the publisher might get one,
|
||||||
|
// two or three more batches in. The ceiling is 40 as there's 2 queues.
|
||||||
|
//
|
||||||
|
assert (4 * HWM == send_count || 3 * HWM == send_count
|
||||||
|
|| 2 * HWM == send_count);
|
||||||
|
|
||||||
|
|
||||||
// CLEANUP
|
// CLEANUP
|
||||||
@ -151,7 +148,6 @@ static void subscriber_thread_main (void *pvoid)
|
|||||||
|
|
||||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subsocket, cfg->backend_endpoint));
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subsocket, cfg->backend_endpoint));
|
||||||
|
|
||||||
lower_tcp_buff (subsocket);
|
|
||||||
|
|
||||||
// receive all sent messages
|
// receive all sent messages
|
||||||
uint64_t rxsuccess = 0;
|
uint64_t rxsuccess = 0;
|
||||||
@ -179,8 +175,12 @@ static void subscriber_thread_main (void *pvoid)
|
|||||||
|
|
||||||
|
|
||||||
// VERIFY EXPECTED RESULTS
|
// VERIFY EXPECTED RESULTS
|
||||||
|
// EXPLANATION FOR RX TO BE CONSIDERED SUCCESSFUL:
|
||||||
|
// see publisher thread why we have 3 possible outcomes as number of RX messages
|
||||||
|
|
||||||
|
assert (4 * HWM == rxsuccess || 3 * HWM == rxsuccess
|
||||||
|
|| 2 * HWM == rxsuccess);
|
||||||
|
|
||||||
TEST_ASSERT (4 * HWM == rxsuccess || 2 * HWM == rxsuccess);
|
|
||||||
|
|
||||||
// INFORM THAT WE COMPLETED:
|
// INFORM THAT WE COMPLETED:
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user