mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-14 09:47:56 +08:00
Problem: test does not trigger HWM
Solution: modify order of operations, add diagnostic output
This commit is contained in:
parent
48a1e637b6
commit
f70097c1cf
@ -32,34 +32,44 @@
|
|||||||
void test_get_peer_state ()
|
void test_get_peer_state ()
|
||||||
{
|
{
|
||||||
#ifdef ZMQ_BUILD_DRAFT_API
|
#ifdef ZMQ_BUILD_DRAFT_API
|
||||||
size_t len = MAX_SOCKET_STRING;
|
|
||||||
char my_endpoint[MAX_SOCKET_STRING];
|
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
assert (ctx);
|
assert (ctx);
|
||||||
void *router = zmq_socket (ctx, ZMQ_ROUTER);
|
void *router = zmq_socket (ctx, ZMQ_ROUTER);
|
||||||
assert (router);
|
assert (router);
|
||||||
|
|
||||||
int rc = zmq_bind (router, "tcp://127.0.0.1:*");
|
int rc;
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
int mandatory = 1;
|
int mandatory = 1;
|
||||||
rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory,
|
rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory,
|
||||||
sizeof (mandatory));
|
sizeof (mandatory));
|
||||||
|
|
||||||
// Create dealer called "X" and connect it to our router
|
rc = zmq_bind (router, "tcp://127.0.0.1:*");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
size_t my_endpoint_len = MAX_SOCKET_STRING;
|
||||||
|
char my_endpoint[MAX_SOCKET_STRING];
|
||||||
|
rc =
|
||||||
|
zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &my_endpoint_len);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
void *dealer1 = zmq_socket (ctx, ZMQ_DEALER);
|
void *dealer1 = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
assert (dealer1);
|
assert (dealer1);
|
||||||
|
|
||||||
|
void *dealer2 = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
|
assert (dealer2);
|
||||||
|
|
||||||
|
// Lower HWMs to allow doing the test with fewer messages
|
||||||
|
int hwm = 100;
|
||||||
|
zmq_setsockopt (router, ZMQ_SNDHWM, &hwm, sizeof (int));
|
||||||
|
zmq_setsockopt (dealer1, ZMQ_RCVHWM, &hwm, sizeof (int));
|
||||||
|
zmq_setsockopt (dealer2, ZMQ_RCVHWM, &hwm, sizeof (int));
|
||||||
|
|
||||||
|
// Name dealer1 "X" and connect it to our router
|
||||||
rc = zmq_setsockopt (dealer1, ZMQ_IDENTITY, "X", 1);
|
rc = zmq_setsockopt (dealer1, ZMQ_IDENTITY, "X", 1);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_connect (dealer1, my_endpoint);
|
rc = zmq_connect (dealer1, my_endpoint);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Create dealer called "Y" and connect it to our router
|
// Name dealer2 "Y" and connect it to our router
|
||||||
void *dealer2 = zmq_socket (ctx, ZMQ_DEALER);
|
|
||||||
assert (dealer2);
|
|
||||||
rc = zmq_setsockopt (dealer2, ZMQ_IDENTITY, "Y", 1);
|
rc = zmq_setsockopt (dealer2, ZMQ_IDENTITY, "Y", 1);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_connect (dealer2, my_endpoint);
|
rc = zmq_connect (dealer2, my_endpoint);
|
||||||
@ -83,7 +93,6 @@ void test_get_peer_state ()
|
|||||||
rc = zmq_recv (router, buffer, 255, 0);
|
rc = zmq_recv (router, buffer, 255, 0);
|
||||||
assert (rc == 5);
|
assert (rc == 5);
|
||||||
|
|
||||||
|
|
||||||
void *poller = zmq_poller_new ();
|
void *poller = zmq_poller_new ();
|
||||||
assert (poller);
|
assert (poller);
|
||||||
|
|
||||||
@ -95,41 +104,53 @@ void test_get_peer_state ()
|
|||||||
|
|
||||||
const size_t count = 10000;
|
const size_t count = 10000;
|
||||||
const size_t event_size = 2;
|
const size_t event_size = 2;
|
||||||
|
bool dealer2_blocked = false;
|
||||||
|
size_t dealer1_sent = 0, dealer2_sent = 0, dealer1_received = 0;
|
||||||
zmq_poller_event_t events[event_size];
|
zmq_poller_event_t events[event_size];
|
||||||
for (size_t iterations = 0;
|
for (size_t iterations = 0; iterations < count; ++iterations) {
|
||||||
iterations < count
|
rc = zmq_poller_wait_all (poller, events, event_size, -1);
|
||||||
&& zmq_poller_wait_all (poller, events, event_size, -1) != -1;
|
assert (rc != -1);
|
||||||
++iterations) {
|
|
||||||
for (size_t i = 0; i < event_size; ++i) {
|
for (size_t i = 0; i < event_size; ++i) {
|
||||||
if (events[i].socket == router) {
|
if (events[i].socket == router && events[i].events & ZMQ_POLLOUT) {
|
||||||
rc = zmq_socket_get_peer_state (router, "X", 1);
|
rc = zmq_socket_get_peer_state (router, "X", 1);
|
||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
printf ("zmq_socket_get_peer_state failed: %i\n", errno);
|
printf ("zmq_socket_get_peer_state failed: %i\n", errno);
|
||||||
assert (rc != -1);
|
assert (rc != -1);
|
||||||
if (rc & ZMQ_POLLOUT) {
|
if (rc & ZMQ_POLLOUT) {
|
||||||
rc = zmq_send (router, "X", 1, ZMQ_SNDMORE);
|
rc = zmq_send (router, "X", 1, ZMQ_SNDMORE | ZMQ_DONTWAIT);
|
||||||
assert (rc == 1);
|
assert (rc == 1);
|
||||||
rc = zmq_send (router, "Hello", 5, 0);
|
rc = zmq_send (router, "Hello", 5, ZMQ_DONTWAIT);
|
||||||
assert (rc == 5);
|
assert (rc == 5);
|
||||||
|
|
||||||
|
++dealer1_sent;
|
||||||
}
|
}
|
||||||
|
|
||||||
rc = zmq_socket_get_peer_state (router, "Y", 1);
|
rc = zmq_socket_get_peer_state (router, "Y", 1);
|
||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
printf ("zmq_socket_get_peer_state failed: %i\n", errno);
|
printf ("zmq_socket_get_peer_state failed: %i\n", errno);
|
||||||
assert (rc != -1);
|
assert (rc != -1);
|
||||||
if (rc & ZMQ_POLLOUT) {
|
if (rc & ZMQ_POLLOUT) {
|
||||||
rc = zmq_send (router, "Y", 1, ZMQ_SNDMORE);
|
rc = zmq_send (router, "Y", 1, ZMQ_SNDMORE | ZMQ_DONTWAIT);
|
||||||
assert (rc == 1);
|
assert (rc == 1);
|
||||||
rc = zmq_send (router, "Hello", 5, 0);
|
rc = zmq_send (router, "Hello", 5, ZMQ_DONTWAIT);
|
||||||
assert (rc == 5);
|
assert (rc == 5);
|
||||||
|
++dealer2_sent;
|
||||||
|
} else {
|
||||||
|
dealer2_blocked = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (events[i].socket == dealer1) {
|
if (events[i].socket == dealer1 && events[i].events & ZMQ_POLLIN) {
|
||||||
rc = zmq_recv (dealer1, buffer, 255, ZMQ_DONTWAIT);
|
rc = zmq_recv (dealer1, buffer, 255, ZMQ_DONTWAIT);
|
||||||
assert (rc == 5);
|
assert (rc == 5);
|
||||||
|
|
||||||
|
++dealer1_received;
|
||||||
}
|
}
|
||||||
// never read from dealer2, so its pipe becomes full eventually
|
// never read from dealer2, so its pipe becomes full eventually
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
printf ("dealer1_sent = %zu, dealer2_sent = %zu, dealer1_received = %zu\n",
|
||||||
|
dealer1_sent, dealer2_sent, dealer1_received);
|
||||||
|
assert (dealer2_blocked);
|
||||||
zmq_poller_destroy (&poller);
|
zmq_poller_destroy (&poller);
|
||||||
|
|
||||||
rc = zmq_close (router);
|
rc = zmq_close (router);
|
||||||
@ -217,5 +238,5 @@ int main (void)
|
|||||||
test_basic ();
|
test_basic ();
|
||||||
test_get_peer_state ();
|
test_get_peer_state ();
|
||||||
|
|
||||||
return 0 ;
|
return 0;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user