0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-26 23:01:04 +08:00

Problem: tests without test framework

Solution: migrate to unity
This commit is contained in:
Simon Giesecke 2018-12-07 07:51:30 -05:00
parent f025129768
commit a8b2e5a617
14 changed files with 671 additions and 647 deletions

View File

@ -476,7 +476,8 @@ tests_test_pair_tcp_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_reqrep_inproc_SOURCES = \
tests/test_reqrep_inproc.cpp \
tests/testutil.hpp
tests_test_reqrep_inproc_LDADD = src/libzmq.la
tests_test_reqrep_inproc_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_reqrep_inproc_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_reqrep_tcp_SOURCES = \
tests/test_reqrep_tcp.cpp \
@ -493,10 +494,12 @@ tests_test_hwm_pubsub_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_hwm_pubsub_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_reqrep_device_SOURCES = tests/test_reqrep_device.cpp
tests_test_reqrep_device_LDADD = src/libzmq.la
tests_test_reqrep_device_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_reqrep_device_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_sub_forward_SOURCES = tests/test_sub_forward.cpp
tests_test_sub_forward_LDADD = src/libzmq.la
tests_test_sub_forward_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_sub_forward_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_invalid_rep_SOURCES = tests/test_invalid_rep.cpp
tests_test_invalid_rep_LDADD = src/libzmq.la
@ -594,7 +597,8 @@ tests_test_spec_req_SOURCES = tests/test_spec_req.cpp
tests_test_spec_req_LDADD = src/libzmq.la
tests_test_spec_rep_SOURCES = tests/test_spec_rep.cpp
tests_test_spec_rep_LDADD = src/libzmq.la
tests_test_spec_rep_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_spec_rep_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_spec_dealer_SOURCES = tests/test_spec_dealer.cpp
tests_test_spec_dealer_LDADD = src/libzmq.la ${UNITY_LIBS}
@ -672,14 +676,16 @@ tests_test_xpub_manual_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_xpub_manual_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_xpub_welcome_msg_SOURCES = tests/test_xpub_welcome_msg.cpp
tests_test_xpub_welcome_msg_LDADD = src/libzmq.la
tests_test_xpub_welcome_msg_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_xpub_welcome_msg_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_xpub_verbose_SOURCES = tests/test_xpub_verbose.cpp
tests_test_xpub_verbose_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_xpub_verbose_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_atomics_SOURCES = tests/test_atomics.cpp
tests_test_atomics_LDADD = src/libzmq.la
tests_test_atomics_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_atomics_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_sockopt_hwm_SOURCES = tests/test_sockopt_hwm.cpp
tests_test_sockopt_hwm_LDADD = src/libzmq.la ${UNITY_LIBS}
@ -790,7 +796,8 @@ tests_test_rebind_ipc_LDADD = src/libzmq.la
tests_test_reqrep_ipc_SOURCES = \
tests/test_reqrep_ipc.cpp \
tests/testutil.hpp
tests_test_reqrep_ipc_LDADD = src/libzmq.la
tests_test_reqrep_ipc_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_reqrep_ipc_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_timeo_SOURCES = tests/test_timeo.cpp
tests_test_timeo_LDADD = src/libzmq.la
@ -806,7 +813,8 @@ tests_test_use_fd_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_use_fd_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_zmq_poll_fd_SOURCES = tests/test_zmq_poll_fd.cpp
tests_test_zmq_poll_fd_LDADD = src/libzmq.la
tests_test_zmq_poll_fd_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_zmq_poll_fd_CPPFLAGS = ${UNITY_CPPFLAGS}
if HAVE_FORK
if !VALGRIND_ENABLED
@ -840,10 +848,12 @@ tests_test_pair_tipc_SOURCES = tests/test_pair_tipc.cpp
tests_test_pair_tipc_LDADD = src/libzmq.la
tests_test_reqrep_device_tipc_SOURCES = tests/test_reqrep_device_tipc.cpp
tests_test_reqrep_device_tipc_LDADD = src/libzmq.la
tests_test_reqrep_device_tipc_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_reqrep_device_tipc_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_reqrep_tipc_SOURCES = tests/test_reqrep_tipc.cpp
tests_test_reqrep_tipc_LDADD = src/libzmq.la
tests_test_reqrep_tipc_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_reqrep_tipc_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_router_mandatory_tipc_SOURCES = tests/test_router_mandatory_tipc.cpp
tests_test_router_mandatory_tipc_LDADD = src/libzmq.la
@ -852,7 +862,8 @@ tests_test_shutdown_stress_tipc_SOURCES = tests/test_shutdown_stress_tipc.cpp
tests_test_shutdown_stress_tipc_LDADD = src/libzmq.la
tests_test_sub_forward_tipc_SOURCES = tests/test_sub_forward_tipc.cpp
tests_test_sub_forward_tipc_LDADD = src/libzmq.la
tests_test_sub_forward_tipc_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_sub_forward_tipc_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_term_endpoint_tipc_SOURCES = tests/test_term_endpoint_tipc.cpp
tests_test_term_endpoint_tipc_LDADD = src/libzmq.la
@ -876,7 +887,8 @@ test_apps += tests/test_abstract_ipc \
tests/test_many_sockets
tests_test_abstract_ipc_SOURCES = tests/test_abstract_ipc.cpp
tests_test_abstract_ipc_LDADD = src/libzmq.la
tests_test_abstract_ipc_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_abstract_ipc_CPPFLAGS = ${UNITY_CPPFLAGS}
endif
@ -930,7 +942,8 @@ tests_test_scatter_gather_SOURCES = tests/test_scatter_gather.cpp
tests_test_scatter_gather_LDADD = src/libzmq.la
tests_test_dgram_SOURCES = tests/test_dgram.cpp
tests_test_dgram_LDADD = src/libzmq.la
tests_test_dgram_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_dgram_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_app_meta_SOURCES = tests/test_app_meta.cpp
tests_test_app_meta_LDADD = src/libzmq.la ${UNITY_LIBS}

View File

@ -28,40 +28,47 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
static const char test_endpoint[] = "ipc://@tmp-tester";
void test_roundtrip ()
{
void *sb = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, test_endpoint));
char endpoint[MAX_SOCKET_STRING];
size_t size = sizeof (endpoint);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &size));
TEST_ASSERT_EQUAL_INT (0, strncmp (endpoint, test_endpoint, size));
void *sc = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, test_endpoint));
bounce (sb, sc);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
int main (void)
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_DEALER);
assert (sb);
int rc = zmq_bind (sb, "ipc://@tmp-tester");
assert (rc == 0);
char endpoint[200];
size_t size = sizeof (endpoint);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &size);
assert (rc == 0);
rc = strncmp (endpoint, "ipc://@tmp-tester", size);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_DEALER);
assert (sc);
rc = zmq_connect (sc, "ipc://@tmp-tester");
assert (rc == 0);
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
UNITY_BEGIN ();
RUN_TEST (test_roundtrip);
return UNITY_END ();
}

View File

@ -28,21 +28,38 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
#include <unity.h>
void setUp ()
{
}
void tearDown ()
{
}
void test ()
{
void *counter = zmq_atomic_counter_new ();
assert (zmq_atomic_counter_value (counter) == 0);
assert (zmq_atomic_counter_inc (counter) == 0);
assert (zmq_atomic_counter_inc (counter) == 1);
assert (zmq_atomic_counter_inc (counter) == 2);
assert (zmq_atomic_counter_value (counter) == 3);
assert (zmq_atomic_counter_dec (counter) == 1);
assert (zmq_atomic_counter_dec (counter) == 1);
assert (zmq_atomic_counter_dec (counter) == 0);
TEST_ASSERT_EQUAL_INT (0, zmq_atomic_counter_value (counter));
TEST_ASSERT_EQUAL_INT (0, zmq_atomic_counter_inc (counter));
TEST_ASSERT_EQUAL_INT (1, zmq_atomic_counter_inc (counter));
TEST_ASSERT_EQUAL_INT (2, zmq_atomic_counter_inc (counter));
TEST_ASSERT_EQUAL_INT (3, zmq_atomic_counter_value (counter));
TEST_ASSERT_EQUAL_INT (1, zmq_atomic_counter_dec (counter));
TEST_ASSERT_EQUAL_INT (1, zmq_atomic_counter_dec (counter));
TEST_ASSERT_EQUAL_INT (0, zmq_atomic_counter_dec (counter));
zmq_atomic_counter_set (counter, 2);
assert (zmq_atomic_counter_dec (counter) == 1);
assert (zmq_atomic_counter_dec (counter) == 0);
TEST_ASSERT_EQUAL_INT (1, zmq_atomic_counter_dec (counter));
TEST_ASSERT_EQUAL_INT (0, zmq_atomic_counter_dec (counter));
zmq_atomic_counter_destroy (&counter);
return 0;
}
int main ()
{
UNITY_BEGIN ();
RUN_TEST (test);
return UNITY_END ();
}

View File

@ -28,72 +28,87 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void str_send_to (void *s_, const char *content_, const char *address_)
{
// Send the address part
int rc = s_sendmore (s_, address_);
assert (rc > 0);
rc = s_send (s_, content_);
assert (rc > 0);
send_string_expect_success (s_, address_, ZMQ_SNDMORE);
send_string_expect_success (s_, content_, 0);
}
void str_recv_from (void *s_, char **ptr_content_, char **ptr_address_)
{
*ptr_address_ = s_recv (s_);
assert (ptr_address_);
TEST_ASSERT_NOT_NULL (ptr_address_);
*ptr_content_ = s_recv (s_);
assert (ptr_content_);
TEST_ASSERT_NOT_NULL (ptr_content_);
}
static const char test_question[] = "Is someone there ?";
static const char test_answer[] = "Yes, there is !";
void test_connect_fails ()
{
void *socket = test_context_socket (ZMQ_DGRAM);
// Connecting dgram should fail
TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO,
zmq_connect (socket, ENDPOINT_4));
test_context_socket_close (socket);
}
void test_roundtrip ()
{
char *message_string;
char *address;
void *sender = test_context_socket (ZMQ_DGRAM);
void *listener = test_context_socket (ZMQ_DGRAM);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (listener, ENDPOINT_4));
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sender, ENDPOINT_5));
str_send_to (sender, test_question, strrchr (ENDPOINT_4, '/') + 1);
str_recv_from (listener, &message_string, &address);
TEST_ASSERT_EQUAL_STRING (test_question, message_string);
TEST_ASSERT_EQUAL_STRING (strrchr (ENDPOINT_5, '/') + 1, address);
free (message_string);
str_send_to (listener, test_answer, address);
free (address);
str_recv_from (sender, &message_string, &address);
TEST_ASSERT_EQUAL_STRING (test_answer, message_string);
TEST_ASSERT_EQUAL_STRING (strrchr (ENDPOINT_4, '/') + 1, address);
free (message_string);
free (address);
test_context_socket_close (sender);
test_context_socket_close (listener);
}
int main (void)
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
char *message_string;
char *address;
void *sender = zmq_socket (ctx, ZMQ_DGRAM);
void *listener = zmq_socket (ctx, ZMQ_DGRAM);
// Connecting dgram shoudl fail
int rc = zmq_connect (listener, ENDPOINT_4);
assert (rc == -1);
rc = zmq_bind (listener, ENDPOINT_4);
assert (rc == 0);
rc = zmq_bind (sender, ENDPOINT_5);
assert (rc == 0);
str_send_to (sender, "Is someone there ?", strrchr (ENDPOINT_4, '/') + 1);
str_recv_from (listener, &message_string, &address);
assert (strcmp (message_string, "Is someone there ?") == 0);
assert (strcmp (address, strrchr (ENDPOINT_5, '/') + 1) == 0);
free (message_string);
str_send_to (listener, "Yes, there is !", address);
free (address);
str_recv_from (sender, &message_string, &address);
assert (strcmp (message_string, "Yes, there is !") == 0);
assert (strcmp (address, strrchr (ENDPOINT_4, '/') + 1) == 0);
free (message_string);
free (address);
rc = zmq_close (sender);
assert (rc == 0);
rc = zmq_close (listener);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
UNITY_BEGIN ();
RUN_TEST (test_connect_fails);
RUN_TEST (test_roundtrip);
return UNITY_END ();
}

View File

@ -28,126 +28,108 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test_roundtrip ()
{
setup_test_environment ();
size_t len = MAX_SOCKET_STRING;
char endpoint1[MAX_SOCKET_STRING];
char endpoint2[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create a req/rep device.
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
int rc = zmq_bind (dealer, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (dealer, ZMQ_LAST_ENDPOINT, endpoint1, &len);
assert (rc == 0);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, endpoint2, &len);
assert (rc == 0);
void *dealer = test_context_socket (ZMQ_DEALER);
bind_loopback_ipv4 (dealer, endpoint1, sizeof (endpoint1));
void *router = test_context_socket (ZMQ_ROUTER);
bind_loopback_ipv4 (router, endpoint2, sizeof (endpoint2));
// Create a worker.
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
rc = zmq_connect (rep, endpoint1);
assert (rc == 0);
void *rep = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rep, endpoint1));
// Create a client.
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_connect (req, endpoint2);
assert (rc == 0);
void *req = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (req, endpoint2));
// Send a request.
rc = zmq_send (req, "ABC", 3, ZMQ_SNDMORE);
assert (rc == 3);
rc = zmq_send (req, "DEF", 3, 0);
assert (rc == 3);
send_string_expect_success (req, "ABC", ZMQ_SNDMORE);
send_string_expect_success (req, "DEF", 0);
// Pass the request through the device.
for (int i = 0; i != 4; i++) {
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_msg_recv (&msg, router, 0);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, router, 0));
int rcvmore;
size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (router, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
rc = zmq_msg_send (&msg, dealer, rcvmore ? ZMQ_SNDMORE : 0);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (router, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&msg, dealer, rcvmore ? ZMQ_SNDMORE : 0));
}
// Receive the request.
char buff[3];
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "ABC", 3) == 0);
recv_string_expect_success (rep, "ABC", 0);
int rcvmore;
size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "DEF", 3) == 0);
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_TRUE (rcvmore);
recv_string_expect_success (rep, "DEF", 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_FALSE (rcvmore);
// Send the reply.
rc = zmq_send (rep, "GHI", 3, ZMQ_SNDMORE);
assert (rc == 3);
rc = zmq_send (rep, "JKL", 3, 0);
assert (rc == 3);
send_string_expect_success (rep, "GHI", ZMQ_SNDMORE);
send_string_expect_success (rep, "JKL", 0);
// Pass the reply through the device.
for (int i = 0; i != 4; i++) {
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_msg_recv (&msg, dealer, 0);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, dealer, 0));
int rcvmore;
rc = zmq_getsockopt (dealer, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
rc = zmq_msg_send (&msg, router, rcvmore ? ZMQ_SNDMORE : 0);
assert (rc >= 0);
size_t sz = sizeof (rcvmore);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (dealer, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&msg, router, rcvmore ? ZMQ_SNDMORE : 0));
}
// Receive the reply.
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "GHI", 3) == 0);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "JKL", 3) == 0);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
recv_string_expect_success (req, "GHI", 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_TRUE (rcvmore);
recv_string_expect_success (req, "JKL", 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_FALSE (rcvmore);
// Clean up.
rc = zmq_close (req);
assert (rc == 0);
rc = zmq_close (rep);
assert (rc == 0);
rc = zmq_close (router);
assert (rc == 0);
rc = zmq_close (dealer);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
test_context_socket_close (req);
test_context_socket_close (rep);
test_context_socket_close (router);
test_context_socket_close (dealer);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_roundtrip);
return UNITY_END ();
}

View File

@ -28,124 +28,108 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
// TODO this is heavily duplicated with test_reqrep_device.cpp
void test_roundtrip ()
{
// Create a req/rep device.
void *dealer = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dealer, "tipc://{5560,0,0}"));
void *router = test_context_socket (ZMQ_ROUTER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (router, "tipc://{5561,0,0}"));
// Create a worker.
void *rep = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rep, "tipc://{5560,0}@0.0.0"));
// Create a client.
void *req = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (req, "tipc://{5561,0}@0.0.0"));
// Send a request.
send_string_expect_success (req, "ABC", ZMQ_SNDMORE);
send_string_expect_success (req, "DEF", 0);
// Pass the request through the device.
for (int i = 0; i != 4; i++) {
zmq_msg_t msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, router, 0));
int rcvmore;
size_t sz = sizeof (rcvmore);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (router, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&msg, dealer, rcvmore ? ZMQ_SNDMORE : 0));
}
// Receive the request.
recv_string_expect_success (rep, "ABC", 0);
int rcvmore;
size_t sz = sizeof (rcvmore);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_TRUE (rcvmore);
recv_string_expect_success (rep, "DEF", 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_FALSE (rcvmore);
// Send the reply.
send_string_expect_success (rep, "GHI", ZMQ_SNDMORE);
send_string_expect_success (rep, "JKL", 0);
// Pass the reply through the device.
for (int i = 0; i != 4; i++) {
zmq_msg_t msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, dealer, 0));
int rcvmore;
size_t sz = sizeof (rcvmore);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (dealer, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&msg, router, rcvmore ? ZMQ_SNDMORE : 0));
}
// Receive the reply.
recv_string_expect_success (req, "GHI", 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_TRUE (rcvmore);
recv_string_expect_success (req, "JKL", 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_FALSE (rcvmore);
// Clean up.
test_context_socket_close (req);
test_context_socket_close (rep);
test_context_socket_close (router);
test_context_socket_close (dealer);
}
int main ()
{
if (!is_tipc_available ()) {
printf ("TIPC environment unavailable, skipping test\n");
return 77;
}
fprintf (stderr, "test_reqrep_device_tipc running...\n");
void *ctx = zmq_init (1);
assert (ctx);
// Create a req/rep device.
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
int rc = zmq_bind (dealer, "tipc://{5560,0,0}");
assert (rc == 0);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
rc = zmq_bind (router, "tipc://{5561,0,0}");
assert (rc == 0);
// Create a worker.
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
rc = zmq_connect (rep, "tipc://{5560,0}@0.0.0");
assert (rc == 0);
// Create a client.
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_connect (req, "tipc://{5561,0}@0.0.0");
assert (rc == 0);
// Send a request.
rc = zmq_send (req, "ABC", 3, ZMQ_SNDMORE);
assert (rc == 3);
rc = zmq_send (req, "DEF", 3, 0);
assert (rc == 3);
// Pass the request through the device.
for (int i = 0; i != 4; i++) {
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recvmsg (router, &msg, 0);
assert (rc >= 0);
int rcvmore;
size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (router, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
rc = zmq_sendmsg (dealer, &msg, rcvmore ? ZMQ_SNDMORE : 0);
assert (rc >= 0);
}
// Receive the request.
char buff[3];
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "ABC", 3) == 0);
int rcvmore;
size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "DEF", 3) == 0);
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
// Send the reply.
rc = zmq_send (rep, "GHI", 3, ZMQ_SNDMORE);
assert (rc == 3);
rc = zmq_send (rep, "JKL", 3, 0);
assert (rc == 3);
// Pass the reply through the device.
for (int i = 0; i != 4; i++) {
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recvmsg (dealer, &msg, 0);
assert (rc >= 0);
int rcvmore;
rc = zmq_getsockopt (dealer, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
rc = zmq_sendmsg (router, &msg, rcvmore ? ZMQ_SNDMORE : 0);
assert (rc >= 0);
}
// Receive the reply.
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "GHI", 3) == 0);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "JKL", 3) == 0);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
// Clean up.
rc = zmq_close (req);
assert (rc == 0);
rc = zmq_close (rep);
assert (rc == 0);
rc = zmq_close (router);
assert (rc == 0);
rc = zmq_close (dealer);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
UNITY_BEGIN ();
RUN_TEST (test_roundtrip);
return UNITY_END ();
}

View File

@ -28,33 +28,39 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
#include <unity.h>
void setUp ()
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
setup_test_context ();
}
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
int rc = zmq_bind (sb, "inproc://a");
assert (rc == 0);
void tearDown ()
{
teardown_test_context ();
}
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, "inproc://a");
assert (rc == 0);
void test_roundtrip ()
{
void *sb = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://a"));
void *sc = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://a"));
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
test_context_socket_close (sc);
test_context_socket_close (sb);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_roundtrip);
return UNITY_END ();
}

View File

@ -28,85 +28,73 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void test_leak (void)
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test_leak ()
{
char my_endpoint[256];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
int rc = zmq_bind (sb, "ipc://*");
assert (rc == 0);
void *sb = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "ipc://*"));
size_t len = sizeof (my_endpoint);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len));
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
void *sc = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, my_endpoint));
rc = s_send (sc, "leakymsg");
assert (rc == strlen ("leakymsg"));
static const char leakymsg[] = "leakymsg";
send_string_expect_success (sc, leakymsg, 0);
char *buf = s_recv (sb);
free (buf);
rc = zmq_close (sc);
assert (rc == 0);
test_context_socket_close (sc);
msleep (SETTLE_TIME);
rc = s_send (sb, "leakymsg");
assert (rc == strlen ("leakymsg"));
send_string_expect_success (sb, leakymsg, 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (sb);
}
void test_simple (void)
{
char my_endpoint[256];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
int rc = zmq_bind (sb, "ipc://*");
assert (rc == 0);
void *sb = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "ipc://*"));
size_t len = sizeof (my_endpoint);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len));
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
void *sc = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, my_endpoint));
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
int main (void)
{
setup_test_environment ();
test_simple ();
test_leak ();
return 0;
UNITY_BEGIN ();
RUN_TEST (test_simple);
RUN_TEST (test_leak);
return UNITY_END ();
}

View File

@ -28,6 +28,33 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test_roundtrip ()
{
void *sb = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tipc://{5560,0,0}"));
void *sc = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "tipc://{5560,0}@0.0.0"));
bounce (sb, sc);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
int main (void)
{
@ -36,31 +63,8 @@ int main (void)
return 77;
}
fprintf (stderr, "test_reqrep_tipc running...\n");
UNITY_BEGIN ();
RUN_TEST (test_roundtrip);
void *ctx = zmq_init (1);
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
int rc = zmq_bind (sb, "tipc://{5560,0,0}");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, "tipc://{5560,0}@0.0.0");
assert (rc == 0);
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
return UNITY_END ();
}

View File

@ -28,36 +28,43 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
const char *bind_address = 0;
char connect_address[MAX_SOCKET_STRING];
void test_fair_queue_in (void *ctx_)
void test_fair_queue_in (const char *bind_address)
{
void *rep = zmq_socket (ctx_, ZMQ_REP);
assert (rep);
void *rep = test_context_socket (ZMQ_REP);
int timeout = 250;
int rc = zmq_setsockopt (rep, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (rep, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
rc = zmq_bind (rep, bind_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (rep, bind_address));
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len));
const size_t services = 5;
void *reqs[services];
for (size_t peer = 0; peer < services; ++peer) {
reqs[peer] = zmq_socket (ctx_, ZMQ_REQ);
assert (reqs[peer]);
reqs[peer] = test_context_socket (ZMQ_REQ);
rc = zmq_setsockopt (reqs[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_connect (reqs[peer], connect_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (reqs[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (reqs[peer], connect_address));
}
msleep (SETTLE_TIME);
@ -93,31 +100,24 @@ void test_fair_queue_in (void *ctx_)
free (str);
}
#endif
close_zero_linger (rep);
test_context_socket_close_zero_linger (rep);
for (size_t peer = 0; peer < services; ++peer)
close_zero_linger (reqs[peer]);
// Wait for disconnects.
msleep (SETTLE_TIME);
test_context_socket_close_zero_linger (reqs[peer]);
}
void test_envelope (void *ctx_)
void test_envelope (const char *bind_address)
{
void *rep = zmq_socket (ctx_, ZMQ_REP);
assert (rep);
void *rep = test_context_socket (ZMQ_REP);
int rc = zmq_bind (rep, bind_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (rep, bind_address));
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len));
void *dealer = zmq_socket (ctx_, ZMQ_DEALER);
assert (dealer);
void *dealer = test_context_socket (ZMQ_DEALER);
rc = zmq_connect (dealer, connect_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, connect_address));
// minimal envelope
s_send_seq (dealer, 0, "A", SEQ_END);
@ -131,39 +131,52 @@ void test_envelope (void *ctx_)
s_send_seq (rep, "A", SEQ_END);
s_recv_seq (dealer, "X", "Y", 0, "A", SEQ_END);
close_zero_linger (rep);
close_zero_linger (dealer);
// Wait for disconnects.
msleep (SETTLE_TIME);
test_context_socket_close_zero_linger (rep);
test_context_socket_close_zero_linger (dealer);
}
int main (void)
const char bind_inproc[] = "inproc://a";
const char bind_tcp[] = "tcp://127.0.0.1:*";
void test_fair_queue_in_inproc ()
{
test_fair_queue_in (bind_inproc);
}
void test_fair_queue_in_tcp ()
{
test_fair_queue_in (bind_tcp);
}
void test_envelope_inproc ()
{
test_envelope (bind_inproc);
}
void test_envelope_tcp ()
{
test_envelope (bind_tcp);
}
int main ()
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
const char *binds[] = {"inproc://a", "tcp://127.0.0.1:*"};
UNITY_BEGIN ();
for (int transport = 0; transport < 2; ++transport) {
bind_address = binds[transport];
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
RUN_TEST (test_fair_queue_in_inproc);
RUN_TEST (test_fair_queue_in_tcp);
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in (ctx);
// For an incoming message:
// SHALL remove and store the address envelope, including the delimiter.
// SHALL pass the remaining data frames to its calling application.
// SHALL wait for a single reply message from its calling application.
// SHALL prepend the address envelope and delimiter.
// SHALL deliver this message back to the originating peer.
RUN_TEST (test_envelope_inproc);
RUN_TEST (test_envelope_tcp);
// For an incoming message:
// SHALL remove and store the address envelope, including the delimiter.
// SHALL pass the remaining data frames to its calling application.
// SHALL wait for a single reply message from its calling application.
// SHALL prepend the address envelope and delimiter.
// SHALL deliver this message back to the originating peer.
test_envelope (ctx);
}
int rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
return UNITY_END ();
}

View File

@ -28,82 +28,74 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test ()
{
setup_test_environment ();
size_t len = MAX_SOCKET_STRING;
char endpoint1[MAX_SOCKET_STRING];
char endpoint2[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
// First, create an intermediate device
void *xpub = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub);
int rc = zmq_bind (xpub, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (xpub, ZMQ_LAST_ENDPOINT, endpoint1, &len);
assert (rc == 0);
void *xsub = zmq_socket (ctx, ZMQ_XSUB);
assert (xsub);
rc = zmq_bind (xsub, "tcp://127.0.0.1:*");
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (xsub, ZMQ_LAST_ENDPOINT, endpoint2, &len);
assert (rc == 0);
void *xpub = test_context_socket (ZMQ_XPUB);
bind_loopback_ipv4 (xpub, endpoint1, sizeof (endpoint1));
void *xsub = test_context_socket (ZMQ_XSUB);
bind_loopback_ipv4 (xsub, endpoint2, sizeof (endpoint2));
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (pub);
rc = zmq_connect (pub, endpoint2);
assert (rc == 0);
void *pub = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, endpoint2));
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_SUB);
assert (sub);
rc = zmq_connect (sub, endpoint1);
assert (rc == 0);
void *sub = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, endpoint1));
// Subscribe for all messages.
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0));
// Pass the subscription upstream through the device
char buff[32];
rc = zmq_recv (xpub, buff, sizeof (buff), 0);
assert (rc >= 0);
rc = zmq_send (xsub, buff, rc, 0);
assert (rc >= 0);
int size;
TEST_ASSERT_SUCCESS_ERRNO (size = zmq_recv (xpub, buff, sizeof (buff), 0));
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (xsub, buff, size, 0));
// Wait a bit till the subscription gets to the publisher
msleep (SETTLE_TIME);
// Send an empty message
rc = zmq_send (pub, NULL, 0, 0);
assert (rc == 0);
send_string_expect_success (pub, "", 0);
// Pass the message downstream through the device
rc = zmq_recv (xsub, buff, sizeof (buff), 0);
assert (rc >= 0);
rc = zmq_send (xpub, buff, rc, 0);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (size = zmq_recv (xsub, buff, sizeof (buff), 0));
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (xpub, buff, size, 0));
// Receive the message in the subscriber
rc = zmq_recv (sub, buff, sizeof (buff), 0);
assert (rc == 0);
recv_string_expect_success (sub, "", 0);
// Clean up.
rc = zmq_close (xpub);
assert (rc == 0);
rc = zmq_close (xsub);
assert (rc == 0);
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
test_context_socket_close (xpub);
test_context_socket_close (xsub);
test_context_socket_close (pub);
test_context_socket_close (sub);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test);
return UNITY_END ();
}

View File

@ -28,80 +28,75 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test ()
{
// First, create an intermediate device.
void *xpub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (xpub, "tipc://{5560,0,0}"));
void *xsub = test_context_socket (ZMQ_XSUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (xsub, "tipc://{5561,0,0}"));
// Create a publisher.
void *pub = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, "tipc://{5561,0}@0.0.0"));
// Create a subscriber.
void *sub = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "tipc://{5560,0}@0.0.0"));
// TODO the remainder of this method is duplicated with test_sub_forward
// Subscribe for all messages.
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0));
// Pass the subscription upstream through the device
char buff[32];
int size;
TEST_ASSERT_SUCCESS_ERRNO (size = zmq_recv (xpub, buff, sizeof (buff), 0));
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (xsub, buff, size, 0));
// Wait a bit till the subscription gets to the publisher
msleep (SETTLE_TIME);
// Send an empty message
send_string_expect_success (pub, "", 0);
// Pass the message downstream through the device
TEST_ASSERT_SUCCESS_ERRNO (size = zmq_recv (xsub, buff, sizeof (buff), 0));
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (xpub, buff, size, 0));
// Receive the message in the subscriber
recv_string_expect_success (sub, "", 0);
// Clean up.
test_context_socket_close (xpub);
test_context_socket_close (xsub);
test_context_socket_close (pub);
test_context_socket_close (sub);
}
int main ()
{
if (!is_tipc_available ()) {
printf ("TIPC environment unavailable, skipping test\n");
return 77;
}
fprintf (stderr, "test_sub_forward running...\n");
void *ctx = zmq_init (1);
assert (ctx);
// First, create an intermediate device.
void *xpub = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub);
int rc = zmq_bind (xpub, "tipc://{5560,0,0}");
assert (rc == 0);
void *xsub = zmq_socket (ctx, ZMQ_XSUB);
assert (xsub);
rc = zmq_bind (xsub, "tipc://{5561,0,0}");
assert (rc == 0);
// Create a publisher.
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (pub);
rc = zmq_connect (pub, "tipc://{5561,0}@0.0.0");
assert (rc == 0);
// Create a subscriber.
void *sub = zmq_socket (ctx, ZMQ_SUB);
assert (sub);
rc = zmq_connect (sub, "tipc://{5560,0}@0.0.0");
assert (rc == 0);
// Subscribe for all messages.
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
// Pass the subscription upstream through the device.
char buff[32];
rc = zmq_recv (xpub, buff, sizeof (buff), 0);
assert (rc >= 0);
rc = zmq_send (xsub, buff, rc, 0);
assert (rc >= 0);
// Wait a bit till the subscription gets to the publisher.
msleep (SETTLE_TIME);
// Send an empty message.
rc = zmq_send (pub, NULL, 0, 0);
assert (rc == 0);
// Pass the message downstream through the device.
rc = zmq_recv (xsub, buff, sizeof (buff), 0);
assert (rc >= 0);
rc = zmq_send (xpub, buff, rc, 0);
assert (rc >= 0);
// Receive the message in the subscriber.
rc = zmq_recv (sub, buff, sizeof (buff), 0);
assert (rc == 0);
// Clean up.
rc = zmq_close (xpub);
assert (rc == 0);
rc = zmq_close (xsub);
assert (rc == 0);
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
UNITY_BEGIN ();
RUN_TEST (test);
return UNITY_END ();
}

View File

@ -28,54 +28,56 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
#include <unity.h>
void setUp ()
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test ()
{
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
int rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0);
void *pub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
// set pub socket options
rc = zmq_setsockopt (pub, ZMQ_XPUB_WELCOME_MSG, "W", 1);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub, ZMQ_XPUB_WELCOME_MSG, "W", 1));
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_SUB);
void *sub = test_context_socket (ZMQ_SUB);
// Subscribe to the welcome message
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "W", 1);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "W", 1));
assert (sub);
rc = zmq_connect (sub, "inproc://soname");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
char buffer[2];
const uint8_t buffer[2] = {1, 'W'};
// Receive the welcome subscription
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert (buffer[1] == 'W');
recv_array_expect_success (pub, buffer, 0);
// Receive the welcome message
rc = zmq_recv (sub, buffer, 1, 0);
assert (rc == 1);
assert (buffer[0] == 'W');
recv_string_expect_success (sub, "W", 0);
// Clean up.
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
test_context_socket_close (pub);
test_context_socket_close (sub);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test);
return UNITY_END ();
}

View File

@ -28,10 +28,22 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <netdb.h>
#include <unity.h>
void setUp ()
{
setup_test_context ();
}
int main (void)
void tearDown ()
{
teardown_test_context ();
}
void test_poll_fd ()
{
struct addrinfo *addr, hint;
hint.ai_flags = AI_NUMERICHOST;
@ -43,28 +55,21 @@ int main (void)
hint.ai_addr = NULL;
hint.ai_next = NULL;
int rc = getaddrinfo ("127.0.0.1", "6650", &hint, &addr);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (getaddrinfo ("127.0.0.1", "6650", &hint, &addr));
int recv_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP);
assert (recv_socket != -1);
TEST_ASSERT_NOT_EQUAL (-1, recv_socket);
int flag = 1;
rc =
setsockopt (recv_socket, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
setsockopt (recv_socket, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)));
rc = bind (recv_socket, addr->ai_addr, addr->ai_addrlen);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
bind (recv_socket, addr->ai_addr, addr->ai_addrlen));
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = test_context_socket (ZMQ_REP);
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
rc = zmq_bind (sb, "tcp://127.0.0.1:*");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://127.0.0.1:*"));
zmq_pollitem_t pollitems[] = {
{sb, 0, ZMQ_POLLIN, 0},
@ -72,28 +77,29 @@ int main (void)
};
int send_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP);
assert (send_socket != -1);
TEST_ASSERT_NOT_EQUAL (-1, send_socket);
char buf[10];
memset (buf, 1, 10);
rc = sendto (send_socket, buf, 10, 0, addr->ai_addr, addr->ai_addrlen);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (
sendto (send_socket, buf, 10, 0, addr->ai_addr, addr->ai_addrlen));
assert (zmq_poll (pollitems, 2, 1) == 1);
assert ((pollitems[0].revents & ZMQ_POLLIN) == 0);
assert (pollitems[1].revents & ZMQ_POLLIN);
TEST_ASSERT_EQUAL (1, zmq_poll (pollitems, 2, 1));
TEST_ASSERT_BITS_LOW (ZMQ_POLLIN, pollitems[0].revents);
TEST_ASSERT_BITS_HIGH (ZMQ_POLLIN, pollitems[1].revents);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (sb);
close (send_socket);
close (recv_socket);
freeaddrinfo (addr);
return 0;
}
int main ()
{
UNITY_BEGIN ();
RUN_TEST (test_poll_fd);
return UNITY_END ();
}