From 798b394087d27201e3d4a16594ea867bf5eecf08 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Tue, 2 Jul 2013 15:04:31 +0200 Subject: [PATCH] Add tests for Request-Reply pattern sockets. * See http://rfc.zeromq.org/spec:28/REQREP * Not all testable statements are covered. * At this point, there are several failures: - test_spec_req: The REQ socket does not correctly discard messages from peers that are not currently being talked to. - test_spec_dealer/router: On disconnect, the queues seem to not be emptied. The DEALER can still receive a message the disconnected peer sent, the ROUTER can still send to the identity of the dis- connected peer. --- tests/Makefile.am | 12 +- tests/test_spec_dealer.cpp | 248 +++++++++++++++++++++++++++++++++++++ tests/test_spec_rep.cpp | 144 +++++++++++++++++++++ tests/test_spec_req.cpp | 230 ++++++++++++++++++++++++++++++++++ tests/test_spec_router.cpp | 180 +++++++++++++++++++++++++++ tests/testutil.hpp | 80 +++++++++++- 6 files changed, 891 insertions(+), 3 deletions(-) create mode 100644 tests/test_spec_dealer.cpp create mode 100644 tests/test_spec_rep.cpp create mode 100644 tests/test_spec_req.cpp create mode 100644 tests/test_spec_router.cpp diff --git a/tests/Makefile.am b/tests/Makefile.am index cfe3334f..27e15d4c 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -24,7 +24,11 @@ noinst_PROGRAMS = test_pair_inproc \ test_ctx_options \ test_security \ test_security_curve \ - test_iov + test_iov \ + test_spec_req \ + test_spec_rep \ + test_spec_dealer \ + test_spec_router if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -55,6 +59,10 @@ test_ctx_options_SOURCES = test_ctx_options.cpp test_iov_SOURCES = test_iov.cpp test_security_SOURCES = test_security.cpp test_security_curve_SOURCES = test_security_curve.cpp +test_spec_req_SOURCES = test_spec_req.cpp +test_spec_rep_SOURCES = test_spec_rep.cpp +test_spec_dealer_SOURCES = test_spec_dealer.cpp +test_spec_router_SOURCES = test_spec_router.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp @@ -67,4 +75,4 @@ noinst_PROGRAMS += test_raw_sock test_raw_sock_SOURCES = test_raw_sock.cpp # Run the test cases -TESTS = $(noinst_PROGRAMS) \ No newline at end of file +TESTS = $(noinst_PROGRAMS) diff --git a/tests/test_spec_dealer.cpp b/tests/test_spec_dealer.cpp new file mode 100644 index 00000000..1235f26b --- /dev/null +++ b/tests/test_spec_dealer.cpp @@ -0,0 +1,248 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include "testutil.hpp" + +void test_round_robin_out (void *ctx) +{ + void *dealer = zmq_socket (ctx, ZMQ_DEALER); + assert (dealer); + + int rc = zmq_bind (dealer, "inproc://b"); + assert (rc == 0); + + const size_t N = 5; + void *rep[N]; + for (size_t i = 0; i < N; ++i) + { + rep[i] = zmq_socket (ctx, ZMQ_REP); + assert (rep[i]); + + int timeout = 100; + rc = zmq_setsockopt (rep[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert (rc == 0); + + rc = zmq_connect (rep[i], "inproc://b"); + assert (rc == 0); + } + + // Send N requests + for (size_t i = 0; i < N; ++i) + { + s_send_seq (dealer, 0, "ABC", SEQ_END); + } + + // Expect every REP got one message + zmq_msg_t msg; + zmq_msg_init (&msg); + + for (size_t i = 0; i < N; ++i) + { + s_recv_seq (rep[i], "ABC", SEQ_END); + } + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (dealer); + assert (rc == 0); + + for (size_t i = 0; i < N; ++i) + { + rc = zmq_close (rep[i]); + assert (rc == 0); + } +} + +void test_fair_queue_in (void *ctx) +{ + void *receiver = zmq_socket (ctx, ZMQ_DEALER); + assert (receiver); + + int timeout = 100; + int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert (rc == 0); + + rc = zmq_bind (receiver, "inproc://a"); + assert (rc == 0); + + const size_t N = 5; + void *senders[N]; + for (size_t i = 0; i < N; ++i) + { + senders[i] = zmq_socket (ctx, ZMQ_DEALER); + assert (senders[i]); + + rc = zmq_setsockopt (senders[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert (rc == 0); + + rc = zmq_connect (senders[i], "inproc://a"); + assert (rc == 0); + } + + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + + s_send_seq (senders[0], "A", SEQ_END); + s_recv_seq (receiver, "A", SEQ_END); + + s_send_seq (senders[0], "A", SEQ_END); + s_recv_seq (receiver, "A", SEQ_END); + + // send N requests + for (size_t i = 0; i < N; ++i) + { + char *str = strdup("A"); + str[0] += i; + s_send_seq (senders[i], str, SEQ_END); + free (str); + } + + // handle N requests + for (size_t i = 0; i < N; ++i) + { + char *str = strdup("A"); + str[0] += i; + s_recv_seq (receiver, str, SEQ_END); + free (str); + } + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (receiver); + assert (rc == 0); + + for (size_t i = 0; i < N; ++i) + { + rc = zmq_close (senders[i]); + assert (rc == 0); + } +} + +void test_destroy_queue_on_disconnect (void *ctx) +{ + void *A = zmq_socket (ctx, ZMQ_DEALER); + assert (A); + + int rc = zmq_bind (A, "inproc://d"); + assert (rc == 0); + + void *B = zmq_socket (ctx, ZMQ_DEALER); + assert (B); + + rc = zmq_connect (B, "inproc://d"); + assert (rc == 0); + + // Send a message in both directions + s_send_seq (A, "ABC", SEQ_END); + s_send_seq (B, "DEF", SEQ_END); + + rc = zmq_disconnect (B, "inproc://d"); + assert (rc == 0); + + // Disconnect may take time and need command processing. + zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } }; + rc = zmq_poll (poller, 2, 100); + assert (rc == 0); + + // No messages should be available, sending should fail. + zmq_msg_t msg; + zmq_msg_init (&msg); + + rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + // After a reconnect of B, the messages should still be gone + rc = zmq_connect (B, "inproc://d"); + assert (rc == 0); + + rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (A); + assert (rc == 0); + + rc = zmq_close (B); + assert (rc == 0); +} + +void test_block_on_send_no_peers (void *ctx) +{ + void *sc = zmq_socket (ctx, ZMQ_DEALER); + assert (sc); + + int timeout = 100; + int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)); + assert (rc == 0); + + rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + rc = zmq_send (sc, 0, 0, 0); + assert (rc == -1); + assert (errno == EAGAIN); + + rc = zmq_close (sc); + assert (rc == 0); +} + +int main () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + // SHALL route outgoing messages to available peers using a round-robin + // strategy. + test_round_robin_out (ctx); + + // SHALL receive incoming messages from its peers using a fair-queuing + // strategy. + test_fair_queue_in (ctx); + + // SHALL block on sending, or return a suitable error, when it has no connected peers. + test_block_on_send_no_peers (ctx); + + // SHALL create a double queue when a peer connects to it. If this peer + // disconnects, the DEALER socket SHALL destroy its double queue and SHALL + // discard any messages it contains. + test_destroy_queue_on_disconnect (ctx); + + int rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} diff --git a/tests/test_spec_rep.cpp b/tests/test_spec_rep.cpp new file mode 100644 index 00000000..2f1424aa --- /dev/null +++ b/tests/test_spec_rep.cpp @@ -0,0 +1,144 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include "testutil.hpp" + +void test_fair_queue_in (void *ctx) +{ + void *rep = zmq_socket (ctx, ZMQ_REP); + assert (rep); + + int timeout = 100; + int rc = zmq_setsockopt (rep, ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert (rc == 0); + + rc = zmq_bind (rep, "inproc://a"); + assert (rc == 0); + + const size_t N = 5; + void *reqs[N]; + for (size_t i = 0; i < N; ++i) + { + reqs[i] = zmq_socket (ctx, ZMQ_REQ); + assert (reqs[i]); + + rc = zmq_setsockopt (reqs[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert (rc == 0); + + rc = zmq_connect (reqs[i], "inproc://a"); + assert (rc == 0); + } + + s_send_seq (reqs[0], "A", SEQ_END); + s_recv_seq (rep, "A", SEQ_END); + s_send_seq (rep, "A", SEQ_END); + s_recv_seq (reqs[0], "A", SEQ_END); + + s_send_seq (reqs[0], "A", SEQ_END); + s_recv_seq (rep, "A", SEQ_END); + s_send_seq (rep, "A", SEQ_END); + s_recv_seq (reqs[0], "A", SEQ_END); + + // send N requests + for (size_t i = 0; i < N; ++i) + { + char * str = strdup("A"); + str[0] += i; + s_send_seq (reqs[i], str, SEQ_END); + free (str); + } + + // handle N requests + for (size_t i = 0; i < N; ++i) + { + char * str = strdup("A"); + str[0] += i; + s_recv_seq (rep, str, SEQ_END); + s_send_seq (rep, str, SEQ_END); + s_recv_seq (reqs[i], str, SEQ_END); + free (str); + } + + rc = zmq_close (rep); + assert (rc == 0); + + for (size_t i = 0; i < N; ++i) + { + rc = zmq_close (reqs[i]); + assert (rc == 0); + } +} + +void test_envelope (void *ctx) +{ + void *rep = zmq_socket (ctx, ZMQ_REP); + assert (rep); + + int rc = zmq_bind (rep, "inproc://b"); + assert (rc == 0); + + void *dealer = zmq_socket (ctx, ZMQ_DEALER); + assert (dealer); + + rc = zmq_connect (dealer, "inproc://b"); + assert (rc == 0); + + // minimal envelope + s_send_seq (dealer, 0, "A", SEQ_END); + s_recv_seq (rep, "A", SEQ_END); + s_send_seq (rep, "A", SEQ_END); + s_recv_seq (dealer, 0, "A", SEQ_END); + + // big envelope + s_send_seq (dealer, "X", "Y", 0, "A", SEQ_END); + s_recv_seq (rep, "A", SEQ_END); + s_send_seq (rep, "A", SEQ_END); + s_recv_seq (dealer, "X", "Y", 0, "A", SEQ_END); + + rc = zmq_close (rep); + assert (rc == 0); + + rc = zmq_close (dealer); + assert (rc == 0); +} + +int main () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + // SHALL receive incoming messages from its peers using a fair-queuing + // strategy. + test_fair_queue_in (ctx); + + // For an incoming message: + // SHALL remove and store the address envelope, including the delimiter. + // SHALL pass the remaining data frames to its calling application. + // SHALL wait for a single reply message from its calling application. + // SHALL prepend the address envelope and delimiter. + // SHALL deliver this message back to the originating peer. + test_envelope (ctx); + + int rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} diff --git a/tests/test_spec_req.cpp b/tests/test_spec_req.cpp new file mode 100644 index 00000000..e6f9d43f --- /dev/null +++ b/tests/test_spec_req.cpp @@ -0,0 +1,230 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include "testutil.hpp" + +void test_round_robin_out (void *ctx) +{ + void *req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + + int rc = zmq_bind (req, "inproc://b"); + assert (rc == 0); + + const size_t N = 5; + void *rep[N]; + for (size_t i = 0; i < N; ++i) + { + rep[i] = zmq_socket (ctx, ZMQ_REP); + assert (rep[i]); + + int timeout = 100; + rc = zmq_setsockopt (rep[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert (rc == 0); + + rc = zmq_connect (rep[i], "inproc://b"); + assert (rc == 0); + } + + // Send N request-replies, and expect every REP it used once in order + for (size_t i = 0; i < N; ++i) + { + s_send_seq (req, "ABC", SEQ_END); + s_recv_seq (rep[i], "ABC", SEQ_END); + s_send_seq (rep[i], "DEF", SEQ_END); + s_recv_seq (req, "DEF", SEQ_END); + } + + rc = zmq_close (req); + assert (rc == 0); + + for (size_t i = 0; i < N; ++i) + { + rc = zmq_close (rep[i]); + assert (rc == 0); + } +} + +void test_req_only_listens_to_current_peer (void *ctx) +{ + void *req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + + int rc = zmq_setsockopt(req, ZMQ_IDENTITY, "A", 2); + assert (rc == 0); + + rc = zmq_bind (req, "inproc://c"); + assert (rc == 0); + + const size_t N = 3; + void *router[N]; + for (size_t i = 0; i < N; ++i) + { + router[i] = zmq_socket (ctx, ZMQ_ROUTER); + assert (router[i]); + + int timeout = 100; + rc = zmq_setsockopt (router[i], ZMQ_RCVTIMEO, &timeout, sizeof(timeout)); + assert (rc == 0); + + int enabled = 1; + rc = zmq_setsockopt (router[i], ZMQ_ROUTER_MANDATORY, &enabled, sizeof(enabled)); + assert (rc == 0); + + rc = zmq_connect (router[i], "inproc://c"); + assert (rc == 0); + } + + for (size_t i = 0; i < N; ++i) + { + s_send_seq (req, "ABC", SEQ_END); + + // Receive on router i + s_recv_seq (router[i], "A", 0, "ABC", SEQ_END); + + // Send back replies on all routers + for (size_t j = 0; j < N; ++j) + { + const char *replies[] = { "WRONG", "GOOD" }; + const char *reply = replies[i == j ? 1 : 0]; + s_send_seq (router[j], "A", 0, reply, SEQ_END); + } + + // Recieve only the good relpy + s_recv_seq (req, "GOOD", SEQ_END); + } + + rc = zmq_close (req); + assert (rc == 0); + + for (size_t i = 0; i < N; ++i) + { + rc = zmq_close (router[i]); + assert (rc == 0); + } +} + +void test_req_message_format (void *ctx) +{ + void *req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + + void *router = zmq_socket (ctx, ZMQ_ROUTER); + assert (router); + + int rc = zmq_bind (req, "inproc://a"); + assert (rc == 0); + + rc = zmq_connect (router, "inproc://a"); + assert (rc == 0); + + // Send a multi-part request. + s_send_seq (req, "ABC", "DEF", SEQ_END); + + zmq_msg_t msg; + zmq_msg_init (&msg); + + // Receive peer identity + rc = zmq_msg_recv (&msg, router, 0); + assert (rc != -1); + assert (zmq_msg_size (&msg) > 0); + zmq_msg_t peer_id_msg; + zmq_msg_init (&peer_id_msg); + zmq_msg_copy (&peer_id_msg, &msg); + + int more = 0; + size_t more_size = sizeof(more); + rc = zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size); + assert (rc == 0); + assert (more); + + // Receive the rest. + s_recv_seq (router, 0, "ABC", "DEF", SEQ_END); + + // Send back a single-part reply. + rc = zmq_msg_send (&peer_id_msg, router, ZMQ_SNDMORE); + assert (rc != -1); + s_send_seq (router, 0, "GHI", SEQ_END); + + // Receive reply. + s_recv_seq (req, "GHI", SEQ_END); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_msg_close (&peer_id_msg); + assert (rc == 0); + + rc = zmq_close (req); + assert (rc == 0); + + rc = zmq_close (router); + assert (rc == 0); +} + +void test_block_on_send_no_peers (void *ctx) +{ + void *sc = zmq_socket (ctx, ZMQ_REQ); + assert (sc); + + int timeout = 100; + int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)); + assert (rc == 0); + + rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + rc = zmq_send (sc, 0, 0, 0); + assert (rc == -1); + assert (errno == EAGAIN); + + rc = zmq_close (sc); + assert (rc == 0); +} + +int main () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + // SHALL route outgoing messages to connected peers using a round-robin + // strategy. + test_round_robin_out (ctx); + + // The request and reply messages SHALL have this format on the wire: + // * A delimiter, consisting of an empty frame, added by the REQ socket. + // * One or more data frames, comprising the message visible to the + // application. + test_req_message_format (ctx); + + // SHALL block on sending, or return a suitable error, when it has no connected peers. + test_block_on_send_no_peers (ctx); + + // SHALL accept an incoming message only from the last peer that it sent a + // request to. + // SHALL discard silently any messages received from other peers. + test_req_only_listens_to_current_peer (ctx); + + int rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} diff --git a/tests/test_spec_router.cpp b/tests/test_spec_router.cpp new file mode 100644 index 00000000..e5972e34 --- /dev/null +++ b/tests/test_spec_router.cpp @@ -0,0 +1,180 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include "testutil.hpp" + +void test_fair_queue_in (void *ctx) +{ + void *receiver = zmq_socket (ctx, ZMQ_ROUTER); + assert (receiver); + + int timeout = 100; + int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert (rc == 0); + + rc = zmq_bind (receiver, "inproc://a"); + assert (rc == 0); + + const size_t N = 5; + void *senders[N]; + for (size_t i = 0; i < N; ++i) + { + senders[i] = zmq_socket (ctx, ZMQ_DEALER); + assert (senders[i]); + + rc = zmq_setsockopt (senders[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert (rc == 0); + + char *str = strdup("A"); + str[0] += i; + rc = zmq_setsockopt (senders[i], ZMQ_IDENTITY, str, 2); + assert (rc == 0); + free (str); + + rc = zmq_connect (senders[i], "inproc://a"); + assert (rc == 0); + } + + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + + s_send_seq (senders[0], "M", SEQ_END); + s_recv_seq (receiver, "A", "M", SEQ_END); + + s_send_seq (senders[0], "M", SEQ_END); + s_recv_seq (receiver, "A", "M", SEQ_END); + + // send N requests + for (size_t i = 0; i < N; ++i) + { + s_send_seq (senders[i], "M", SEQ_END); + } + + // handle N requests + for (size_t i = 0; i < N; ++i) + { + char *str = strdup("A"); + str[0] += i; + s_recv_seq (receiver, str, "M", SEQ_END); + free (str); + } + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (receiver); + assert (rc == 0); + + for (size_t i = 0; i < N; ++i) + { + rc = zmq_close (senders[i]); + assert (rc == 0); + } +} + +void test_destroy_queue_on_disconnect (void *ctx) +{ + void *A = zmq_socket (ctx, ZMQ_ROUTER); + assert (A); + + int enabled = 1; + int rc = zmq_setsockopt (A, ZMQ_ROUTER_MANDATORY, &enabled, sizeof(enabled)); + assert (rc == 0); + + rc = zmq_bind (A, "inproc://d"); + assert (rc == 0); + + void *B = zmq_socket (ctx, ZMQ_DEALER); + assert (B); + + rc = zmq_setsockopt (B, ZMQ_IDENTITY, "B", 2); + assert (rc == 0); + + rc = zmq_connect (B, "inproc://d"); + assert (rc == 0); + + // Send a message in both directions + s_send_seq (A, "B", "ABC", SEQ_END); + s_send_seq (B, "DEF", SEQ_END); + + rc = zmq_disconnect (B, "inproc://d"); + assert (rc == 0); + + // Disconnect may take time and need command processing. + zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } }; + rc = zmq_poll (poller, 2, 100); + assert (rc == 0); + + // No messages should be available, sending should fail. + zmq_msg_t msg; + zmq_msg_init (&msg); + + rc = zmq_send (A, "B", 2, ZMQ_SNDMORE | ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EHOSTUNREACH); + + rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + // After a reconnect of B, the messages should still be gone + rc = zmq_connect (B, "inproc://d"); + assert (rc == 0); + + rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (A); + assert (rc == 0); + + rc = zmq_close (B); + assert (rc == 0); +} + + +int main () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + // SHALL receive incoming messages from its peers using a fair-queuing + // strategy. + test_fair_queue_in (ctx); + + // SHALL create a double queue when a peer connects to it. If this peer + // disconnects, the ROUTER socket SHALL destroy its double queue and SHALL + // discard any messages it contains. + test_destroy_queue_on_disconnect (ctx); + + int rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} diff --git a/tests/testutil.hpp b/tests/testutil.hpp index f2be2c4a..eb4b4e48 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -24,11 +24,12 @@ #include #undef NDEBUG #include +#include // Bounce a message from client to server and back // For REQ/REP or DEALER/DEALER pairs only -static void +void bounce (void *server, void *client) { const char *content = "12345678ABCDEFGH12345678abcdefgh"; @@ -108,4 +109,81 @@ s_sendmore (void *socket, const char *string) { #define streq(s1,s2) (!strcmp ((s1), (s2))) #define strneq(s1,s2) (strcmp ((s1), (s2))) + +const char * SEQ_END = (const char *)1; + +// Sends a message composed of frames that are C strings or null frames. +// The list must be terminated by SEQ_END. +// Example: s_send_seq (req, "ABC", 0, "DEF", SEQ_END); +void s_send_seq (void *socket, ...) +{ + va_list ap; + va_start (ap, socket); + const char * data = va_arg (ap, const char *); + while (true) + { + const char * prev = data; + data = va_arg (ap, const char *); + bool end = data == SEQ_END; + + if (!prev) + { + int rc = zmq_send (socket, 0, 0, end ? 0 : ZMQ_SNDMORE); + assert (rc != -1); + } + else + { + int rc = zmq_send (socket, prev, strlen (prev)+1, end ? 0 : ZMQ_SNDMORE); + assert (rc != -1); + } + if (end) + break; + } + va_end (ap); +} + +// Receives message a number of frames long and checks that the frames have +// the given data which can be either C strings or 0 for a null frame. +// The list must be terminated by SEQ_END. +// Example: s_recv_seq (rep, "ABC", 0, "DEF", SEQ_END); +void s_recv_seq (void *socket, ...) +{ + zmq_msg_t msg; + zmq_msg_init (&msg); + + int more; + size_t more_size = sizeof(more); + + va_list ap; + va_start (ap, socket); + const char * data = va_arg (ap, const char *); + while (true) + { + int rc = zmq_msg_recv (&msg, socket, 0); + assert (rc != -1); + + if (!data) + { + assert (zmq_msg_size (&msg) == 0); + } + else + { + assert (strcmp (data, (const char *)zmq_msg_data (&msg)) == 0); + } + + data = va_arg (ap, const char *); + bool end = data == SEQ_END; + + rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size); + assert (rc == 0); + + assert (!more == end); + if (end) + break; + } + va_end (ap); + + zmq_msg_close (&msg); +} + #endif