0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-27 15:41:05 +08:00
libzmq/tests/test_router_notify.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

296 lines
9.1 KiB
C++
Raw Normal View History

/* SPDX-License-Identifier: MPL-2.0 */
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <string.h>
SETUP_TEARDOWN_TESTCONTEXT
void test_sockopt_router_notify ()
{
void *router = test_context_socket (ZMQ_ROUTER);
int opt_notify;
int opt_notify_read;
size_t opt_notify_read_size = sizeof (opt_notify_read);
// default value is off when socket is constructed
TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size));
TEST_ASSERT_EQUAL (0, opt_notify_read);
// valid value - Connect
opt_notify = ZMQ_NOTIFY_CONNECT;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size));
TEST_ASSERT_EQUAL (opt_notify, opt_notify_read);
// valid value - Disconnect
opt_notify = ZMQ_NOTIFY_DISCONNECT;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size));
TEST_ASSERT_EQUAL (opt_notify, opt_notify_read);
// valid value - Off
opt_notify = 0;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size));
TEST_ASSERT_EQUAL (opt_notify, opt_notify_read);
// valid value - Both
opt_notify = ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size));
TEST_ASSERT_EQUAL (opt_notify, opt_notify_read);
// value boundary
opt_notify = -1;
TEST_ASSERT_FAILURE_ERRNO (
EINVAL, zmq_setsockopt (router, ZMQ_ROUTER_NOTIFY, &opt_notify,
sizeof (opt_notify)));
opt_notify = (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT) + 1;
TEST_ASSERT_FAILURE_ERRNO (
EINVAL, zmq_setsockopt (router, ZMQ_ROUTER_NOTIFY, &opt_notify,
sizeof (opt_notify)));
// failures don't update the value
TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size));
TEST_ASSERT_EQUAL (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT,
opt_notify_read);
test_context_socket_close (router);
// check a non-router socket type
void *dealer = test_context_socket (ZMQ_DEALER);
// setsockopt fails for non-router sockets
opt_notify = ZMQ_NOTIFY_CONNECT;
TEST_ASSERT_FAILURE_ERRNO (
EINVAL, zmq_setsockopt (dealer, ZMQ_ROUTER_NOTIFY, &opt_notify,
sizeof (opt_notify)));
// getsockopts returns off for any non-router socket
TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt (
dealer, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size));
TEST_ASSERT_EQUAL (0, opt_notify_read);
test_context_socket_close (dealer);
}
void test_router_notify_helper (int opt_notify_)
{
void *router = test_context_socket (ZMQ_ROUTER);
int opt_more;
size_t opt_more_length = sizeof (opt_more);
int opt_events;
size_t opt_events_length = sizeof (opt_events);
char connect_address[MAX_SOCKET_STRING];
// valid values
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify_, sizeof (opt_notify_)));
bind_loopback_ipv4 (router, connect_address, sizeof connect_address);
void *dealer = test_context_socket (ZMQ_DEALER);
const char *dealer_routing_id = "X";
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dealer, ZMQ_ROUTING_ID, dealer_routing_id, 1));
// dealer connects
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, connect_address));
// connection notification msg
if (opt_notify_ & ZMQ_NOTIFY_CONNECT) {
// routing-id only message of the connect
recv_string_expect_success (router, dealer_routing_id,
0); // 1st part: routing-id
recv_string_expect_success (router, "", 0); // 2nd part: empty
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (router, ZMQ_RCVMORE, &opt_more, &opt_more_length));
TEST_ASSERT_EQUAL (0, opt_more);
}
// test message from the dealer
send_string_expect_success (dealer, "Hello", 0);
recv_string_expect_success (router, dealer_routing_id, 0);
recv_string_expect_success (router, "Hello", 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (router, ZMQ_RCVMORE, &opt_more, &opt_more_length));
TEST_ASSERT_EQUAL (0, opt_more);
// dealer disconnects
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (dealer, connect_address));
// need one more process_commands() (???)
msleep (SETTLE_TIME);
zmq_getsockopt (dealer, ZMQ_EVENTS, &opt_events, &opt_events_length);
// connection notification msg
if (opt_notify_ & ZMQ_NOTIFY_DISCONNECT) {
// routing-id only message of the connect
recv_string_expect_success (router, dealer_routing_id,
0); // 1st part: routing-id
recv_string_expect_success (router, "", 0); // 2nd part: empty
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (router, ZMQ_RCVMORE, &opt_more, &opt_more_length));
TEST_ASSERT_EQUAL (0, opt_more);
}
test_context_socket_close (dealer);
test_context_socket_close (router);
}
void test_router_notify_connect ()
{
test_router_notify_helper (ZMQ_NOTIFY_CONNECT);
}
void test_router_notify_disconnect ()
{
test_router_notify_helper (ZMQ_NOTIFY_DISCONNECT);
}
void test_router_notify_both ()
{
test_router_notify_helper (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT);
}
void test_handshake_fail ()
{
// setup router socket
void *router = test_context_socket (ZMQ_ROUTER);
int opt_timeout = 200;
int opt_notify = ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT;
char connect_address[MAX_SOCKET_STRING];
// valid values
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
router, ZMQ_RCVTIMEO, &opt_timeout, sizeof (opt_timeout)));
bind_loopback_ipv4 (router, connect_address, sizeof connect_address);
// send something on raw tcp
void *stream = test_context_socket (ZMQ_STREAM);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (stream, connect_address));
send_string_expect_success (stream, "not-a-handshake", 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (stream, connect_address));
test_context_socket_close (stream);
// no notification delivered
char buffer[255];
TEST_ASSERT_FAILURE_ERRNO (EAGAIN,
zmq_recv (router, buffer, sizeof (buffer), 0));
test_context_socket_close (router);
}
void test_error_during_multipart ()
{
/*
* If the disconnect occurs in the middle of the multipart
* message, the socket should not add the notification at the
* end of the incomplete message. It must discard the incomplete
* message, and delivert the notification as a new message.
*/
char connect_address[MAX_SOCKET_STRING];
char long_str[128] = {0};
memset (long_str, '*', sizeof (long_str) - 1);
// setup router
void *router = test_context_socket (ZMQ_ROUTER);
int opt_notify = ZMQ_NOTIFY_DISCONNECT;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify)));
int64_t opt_maxmsgsize = 64; // the handshake fails if this is too small
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
router, ZMQ_MAXMSGSIZE, &opt_maxmsgsize, sizeof (opt_maxmsgsize)));
bind_loopback_ipv4 (router, connect_address, sizeof connect_address);
// setup dealer
void *dealer = test_context_socket (ZMQ_DEALER);
const char *dealer_routing_id = "X";
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dealer, ZMQ_ROUTING_ID, dealer_routing_id, 1));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, connect_address));
// send multipart message, the 2nd part causes a disconnect.
send_string_expect_success (dealer, "Hello2", ZMQ_SNDMORE);
send_string_expect_success (dealer, long_str, 0);
// disconnect notification
recv_string_expect_success (router, dealer_routing_id, 0);
recv_string_expect_success (router, "", 0);
test_context_socket_close (dealer);
test_context_socket_close (router);
}
int main (void)
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_sockopt_router_notify);
RUN_TEST (test_router_notify_connect);
RUN_TEST (test_router_notify_disconnect);
RUN_TEST (test_router_notify_both);
RUN_TEST (test_handshake_fail);
RUN_TEST (test_error_during_multipart);
return UNITY_END ();
}