mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-14 09:47:56 +08:00
Merge pull request #3257 from f18m/hwm_test_fix
Remove race condition from XPUB/SUB test in test_blocking()
This commit is contained in:
commit
95beabe7cc
@ -95,12 +95,21 @@ int test_defaults (int send_hwm_, int msg_cnt_, const char *endpoint)
|
||||
return recv_count;
|
||||
}
|
||||
|
||||
int receive (void *socket_)
|
||||
int receive (void *socket_, int *is_termination)
|
||||
{
|
||||
int recv_count = 0;
|
||||
*is_termination = 0;
|
||||
|
||||
// Now receive all sent messages
|
||||
while (0 == zmq_recv (socket_, NULL, 0, 0)) {
|
||||
char buffer[255];
|
||||
int len;
|
||||
while ((len = zmq_recv (socket_, buffer, sizeof (buffer), 0)) >= 0) {
|
||||
++recv_count;
|
||||
|
||||
if (len == 3 && strncmp (buffer, "end", len) == 0) {
|
||||
*is_termination = 1;
|
||||
return recv_count;
|
||||
}
|
||||
}
|
||||
|
||||
return recv_count;
|
||||
@ -141,19 +150,36 @@ int test_blocking (int send_hwm_, int msg_cnt_, const char *endpoint)
|
||||
// Send until we block
|
||||
int send_count = 0;
|
||||
int recv_count = 0;
|
||||
int blocked_count = 0;
|
||||
int is_termination = 0;
|
||||
while (send_count < msg_cnt_) {
|
||||
const int rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
|
||||
if (rc == 0) {
|
||||
++send_count;
|
||||
} else if (-1 == rc) {
|
||||
// if the PUB socket blocks due to HWM, errno should be EAGAIN:
|
||||
blocked_count++;
|
||||
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
|
||||
recv_count += receive (sub_socket);
|
||||
recv_count += receive (sub_socket, &is_termination);
|
||||
}
|
||||
}
|
||||
|
||||
msleep (2 * SETTLE_TIME); // required for TCP transport
|
||||
recv_count += receive (sub_socket);
|
||||
// if send_hwm_ < msg_cnt_, we should block at least once:
|
||||
TEST_ASSERT (blocked_count > 0);
|
||||
|
||||
// dequeue SUB socket again, to make sure XPUB has space to send the termination message
|
||||
recv_count += receive (sub_socket, &is_termination);
|
||||
|
||||
// send termination message
|
||||
send_string_expect_success (pub_socket, "end", 0);
|
||||
|
||||
// now block on the SUB side till we get the termination message
|
||||
while (is_termination == 0)
|
||||
recv_count += receive (sub_socket, &is_termination);
|
||||
|
||||
// remove termination message from the count:
|
||||
recv_count--;
|
||||
|
||||
TEST_ASSERT_EQUAL_INT (send_count, recv_count);
|
||||
|
||||
// Clean up
|
||||
|
Loading…
x
Reference in New Issue
Block a user