mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-14 09:47:56 +08:00
Problem: tests where client should receive an ERROR sometimes do not
receive an ERROR (probably because the connection is closed before) Solution: wait for client-side monitor events before closing the client socket Fixes #2705
This commit is contained in:
parent
843e627bed
commit
e0243dcbca
@ -64,13 +64,15 @@ void expect_new_client_curve_bounce_fail (void *ctx,
|
||||
char *client_secret,
|
||||
char *my_endpoint,
|
||||
void *server,
|
||||
void **client_mon = NULL)
|
||||
void **client_mon = NULL,
|
||||
int expected_client_event = 0,
|
||||
int expected_client_value = 0)
|
||||
{
|
||||
curve_client_data_t curve_client_data = {server_public, client_public,
|
||||
client_secret};
|
||||
expect_new_client_bounce_fail (ctx, my_endpoint, server,
|
||||
socket_config_curve_client,
|
||||
&curve_client_data, client_mon);
|
||||
expect_new_client_bounce_fail (
|
||||
ctx, my_endpoint, server, socket_config_curve_client, &curve_client_data,
|
||||
client_mon, expected_client_event, expected_client_value);
|
||||
}
|
||||
|
||||
void test_null_key (void *ctx,
|
||||
@ -143,25 +145,21 @@ void test_curve_security_with_bogus_client_credentials (
|
||||
char bogus_secret [41];
|
||||
zmq_curve_keypair (bogus_public, bogus_secret);
|
||||
|
||||
void *client_mon;
|
||||
expect_new_client_curve_bounce_fail (ctx, valid_server_public, bogus_public,
|
||||
bogus_secret, my_endpoint, server,
|
||||
&client_mon);
|
||||
NULL,
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 400
|
||||
#else
|
||||
0, 0
|
||||
#endif
|
||||
);
|
||||
|
||||
int server_event_count = 0;
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
server_event_count = expect_monitor_event_multiple (
|
||||
server_mon, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 400);
|
||||
assert (server_event_count <= 1);
|
||||
|
||||
int client_event_count = expect_monitor_event_multiple (
|
||||
client_mon, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 400, true);
|
||||
// this should actually be client_event_count == 1, but this is not always
|
||||
// true, see https://github.com/zeromq/libzmq/issues/2705
|
||||
assert (client_event_count <= 1);
|
||||
|
||||
int rc = zmq_close (client_mon);
|
||||
assert (rc == 0);
|
||||
#endif
|
||||
|
||||
// there may be more than one ZAP request due to repeated attempts by the client
|
||||
|
@ -82,16 +82,19 @@ int expect_new_client_bounce_fail_and_count_monitor_events (
|
||||
void *socket_config_data_,
|
||||
void **client_mon,
|
||||
void *server_mon,
|
||||
int expected_event,
|
||||
int expected_err)
|
||||
int expected_server_event,
|
||||
int expected_server_value,
|
||||
int expected_client_event = 0,
|
||||
int expected_client_value = 0)
|
||||
{
|
||||
expect_new_client_bounce_fail (ctx, my_endpoint, server, socket_config_,
|
||||
socket_config_data_, client_mon);
|
||||
expect_new_client_bounce_fail (
|
||||
ctx, my_endpoint, server, socket_config_, socket_config_data_, client_mon,
|
||||
expected_client_event, expected_client_value);
|
||||
|
||||
int events_received = 0;
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
events_received =
|
||||
expect_monitor_event_multiple (server_mon, expected_event, expected_err);
|
||||
events_received = expect_monitor_event_multiple (
|
||||
server_mon, expected_server_event, expected_server_value);
|
||||
#endif
|
||||
|
||||
return events_received;
|
||||
@ -101,20 +104,23 @@ void test_zap_unsuccessful (void *ctx,
|
||||
char *my_endpoint,
|
||||
void *server,
|
||||
void *server_mon,
|
||||
int expected_event,
|
||||
int expected_err,
|
||||
int expected_server_event,
|
||||
int expected_server_value,
|
||||
socket_config_fn socket_config_,
|
||||
void *socket_config_data_,
|
||||
void **client_mon = NULL)
|
||||
void **client_mon = NULL,
|
||||
int expected_client_event = 0,
|
||||
int expected_client_value = 0)
|
||||
{
|
||||
int events_received =
|
||||
int server_events_received =
|
||||
expect_new_client_bounce_fail_and_count_monitor_events (
|
||||
ctx, my_endpoint, server, socket_config_, socket_config_data_,
|
||||
client_mon, server_mon, expected_event, expected_err);
|
||||
client_mon, server_mon, expected_server_event, expected_server_value,
|
||||
expected_client_event, expected_client_value);
|
||||
|
||||
// there may be more than one ZAP request due to repeated attempts by the
|
||||
// client (actually only in case if ZAP status code 300)
|
||||
assert (events_received == 0
|
||||
assert (server_events_received == 0
|
||||
|| 1 <= zmq_atomic_counter_value (zap_requests_handled));
|
||||
}
|
||||
|
||||
@ -177,7 +183,8 @@ void test_zap_unsuccessful_status_300 (void *ctx,
|
||||
&client_mon);
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
assert_no_more_monitor_events_with_timeout (client_mon, 250);
|
||||
// we can use a 0 timeout here, since the client socket is already closed
|
||||
assert_no_more_monitor_events_with_timeout (client_mon, 0);
|
||||
|
||||
int rc = zmq_close (client_mon);
|
||||
assert (rc == 0);
|
||||
@ -191,7 +198,6 @@ void test_zap_unsuccessful_status_500 (void *ctx,
|
||||
socket_config_fn client_socket_config_,
|
||||
void *client_socket_config_data_)
|
||||
{
|
||||
void *client_mon;
|
||||
test_zap_unsuccessful (ctx, my_endpoint, server, server_mon,
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 500,
|
||||
@ -199,20 +205,13 @@ void test_zap_unsuccessful_status_500 (void *ctx,
|
||||
0, 0,
|
||||
#endif
|
||||
client_socket_config_, client_socket_config_data_,
|
||||
&client_mon);
|
||||
|
||||
NULL,
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
int events_received = 0;
|
||||
events_received = expect_monitor_event_multiple (
|
||||
client_mon, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 500, true);
|
||||
|
||||
// this should actually be events_received == 1, but this is not always
|
||||
// true, see https://github.com/zeromq/libzmq/issues/2705
|
||||
assert (events_received <= 1);
|
||||
|
||||
int rc = zmq_close (client_mon);
|
||||
assert (rc == 0);
|
||||
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 500
|
||||
#else
|
||||
0, 0
|
||||
#endif
|
||||
);
|
||||
}
|
||||
|
||||
void test_zap_errors (socket_config_fn server_socket_config_,
|
||||
|
@ -311,154 +311,6 @@ void zap_handler (void *ctx)
|
||||
zap_handler_generic (ctx, zap_ok);
|
||||
}
|
||||
|
||||
void setup_handshake_socket_monitor (void *ctx,
|
||||
void *server,
|
||||
void **server_mon,
|
||||
const char *monitor_endpoint)
|
||||
{
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
// Monitor handshake events on the server
|
||||
int rc = zmq_socket_monitor (server, monitor_endpoint,
|
||||
ZMQ_EVENT_HANDSHAKE_SUCCEEDED
|
||||
| ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
|
||||
| ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
|
||||
| ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
|
||||
assert (rc == 0);
|
||||
|
||||
// Create socket for collecting monitor events
|
||||
*server_mon = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (*server_mon);
|
||||
|
||||
// Connect it to the inproc endpoints so they'll get events
|
||||
rc = zmq_connect (*server_mon, monitor_endpoint);
|
||||
assert (rc == 0);
|
||||
#endif
|
||||
}
|
||||
|
||||
void setup_context_and_server_side (
|
||||
void **ctx,
|
||||
void **zap_control,
|
||||
void **zap_thread,
|
||||
void **server,
|
||||
void **server_mon,
|
||||
char *my_endpoint,
|
||||
zmq_thread_fn zap_handler_ = &zap_handler,
|
||||
socket_config_fn socket_config_ = &socket_config_curve_server,
|
||||
void *socket_config_data_ = valid_server_secret,
|
||||
const char *identity = "IDENT")
|
||||
{
|
||||
*ctx = zmq_ctx_new ();
|
||||
assert (*ctx);
|
||||
|
||||
// Spawn ZAP handler
|
||||
zap_requests_handled = zmq_atomic_counter_new ();
|
||||
assert (zap_requests_handled != NULL);
|
||||
|
||||
*zap_control = zmq_socket (*ctx, ZMQ_REP);
|
||||
assert (*zap_control);
|
||||
int rc = zmq_bind (*zap_control, "inproc://handler-control");
|
||||
assert (rc == 0);
|
||||
|
||||
if (zap_handler_) {
|
||||
*zap_thread = zmq_threadstart (zap_handler_, *ctx);
|
||||
|
||||
char *buf = s_recv (*zap_control);
|
||||
assert (buf);
|
||||
assert (streq (buf, "GO"));
|
||||
free (buf);
|
||||
} else
|
||||
*zap_thread = NULL;
|
||||
|
||||
// Server socket will accept connections
|
||||
*server = zmq_socket (*ctx, ZMQ_DEALER);
|
||||
assert (*server);
|
||||
|
||||
socket_config_ (*server, socket_config_data_);
|
||||
|
||||
rc = zmq_setsockopt (*server, ZMQ_IDENTITY, identity, strlen (identity));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (*server, "tcp://127.0.0.1:*");
|
||||
assert (rc == 0);
|
||||
|
||||
size_t len = MAX_SOCKET_STRING;
|
||||
rc = zmq_getsockopt (*server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
|
||||
assert (rc == 0);
|
||||
|
||||
const char server_monitor_endpoint [] = "inproc://monitor-server";
|
||||
setup_handshake_socket_monitor (*ctx, *server, server_mon,
|
||||
server_monitor_endpoint);
|
||||
}
|
||||
|
||||
void shutdown_context_and_server_side (void *ctx,
|
||||
void *zap_thread,
|
||||
void *server,
|
||||
void *server_mon,
|
||||
void *zap_control,
|
||||
bool zap_handler_stopped = false)
|
||||
{
|
||||
if (zap_thread && !zap_handler_stopped) {
|
||||
int rc = s_send (zap_control, "STOP");
|
||||
assert (rc == 4);
|
||||
char *buf = s_recv (zap_control);
|
||||
assert (buf);
|
||||
assert (streq (buf, "STOPPED"));
|
||||
free (buf);
|
||||
rc = zmq_unbind (zap_control, "inproc://handler-control");
|
||||
assert (rc == 0);
|
||||
}
|
||||
close_zero_linger(zap_control);
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
close_zero_linger (server_mon);
|
||||
#endif
|
||||
close_zero_linger (server);
|
||||
|
||||
// Wait until ZAP handler terminates
|
||||
if (zap_thread)
|
||||
zmq_threadclose (zap_thread);
|
||||
|
||||
int rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
zmq_atomic_counter_destroy (&zap_requests_handled);
|
||||
}
|
||||
|
||||
void *create_and_connect_client (void *ctx,
|
||||
char *my_endpoint,
|
||||
socket_config_fn socket_config_,
|
||||
void *socket_config_data_,
|
||||
void **client_mon = NULL)
|
||||
{
|
||||
void *client = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (client);
|
||||
|
||||
socket_config_ (client, socket_config_data_);
|
||||
|
||||
int rc = zmq_connect (client, my_endpoint);
|
||||
assert (rc == 0);
|
||||
|
||||
if (client_mon) {
|
||||
setup_handshake_socket_monitor (ctx, client, client_mon,
|
||||
"inproc://client-monitor");
|
||||
}
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
void expect_new_client_bounce_fail (void *ctx,
|
||||
char *my_endpoint,
|
||||
void *server,
|
||||
socket_config_fn socket_config_,
|
||||
void *socket_config_data_,
|
||||
void **client_mon = NULL)
|
||||
{
|
||||
void *client = create_and_connect_client (ctx, my_endpoint, socket_config_,
|
||||
socket_config_data_, client_mon);
|
||||
expect_bounce_fail (server, client);
|
||||
close_zero_linger (client);
|
||||
}
|
||||
|
||||
// Monitor event utilities
|
||||
|
||||
// Read one event off the monitor socket; return value and address
|
||||
@ -495,7 +347,7 @@ static int get_monitor_event_internal (void *monitor,
|
||||
size_t size = zmq_msg_size (&msg);
|
||||
*address = (char *) malloc (size + 1);
|
||||
memcpy (*address, data, size);
|
||||
*address [size] = 0;
|
||||
*address[size] = 0;
|
||||
}
|
||||
return event;
|
||||
}
|
||||
@ -644,4 +496,172 @@ int expect_monitor_event_multiple (void *server_mon,
|
||||
|
||||
#endif
|
||||
|
||||
void setup_handshake_socket_monitor (void *ctx,
|
||||
void *server,
|
||||
void **server_mon,
|
||||
const char *monitor_endpoint)
|
||||
{
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
// Monitor handshake events on the server
|
||||
int rc = zmq_socket_monitor (server, monitor_endpoint,
|
||||
ZMQ_EVENT_HANDSHAKE_SUCCEEDED
|
||||
| ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
|
||||
| ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
|
||||
| ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
|
||||
assert (rc == 0);
|
||||
|
||||
// Create socket for collecting monitor events
|
||||
*server_mon = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (*server_mon);
|
||||
|
||||
// Connect it to the inproc endpoints so they'll get events
|
||||
rc = zmq_connect (*server_mon, monitor_endpoint);
|
||||
assert (rc == 0);
|
||||
#endif
|
||||
}
|
||||
|
||||
void setup_context_and_server_side (
|
||||
void **ctx,
|
||||
void **zap_control,
|
||||
void **zap_thread,
|
||||
void **server,
|
||||
void **server_mon,
|
||||
char *my_endpoint,
|
||||
zmq_thread_fn zap_handler_ = &zap_handler,
|
||||
socket_config_fn socket_config_ = &socket_config_curve_server,
|
||||
void *socket_config_data_ = valid_server_secret,
|
||||
const char *identity = "IDENT")
|
||||
{
|
||||
*ctx = zmq_ctx_new ();
|
||||
assert (*ctx);
|
||||
|
||||
// Spawn ZAP handler
|
||||
zap_requests_handled = zmq_atomic_counter_new ();
|
||||
assert (zap_requests_handled != NULL);
|
||||
|
||||
*zap_control = zmq_socket (*ctx, ZMQ_REP);
|
||||
assert (*zap_control);
|
||||
int rc = zmq_bind (*zap_control, "inproc://handler-control");
|
||||
assert (rc == 0);
|
||||
|
||||
if (zap_handler_) {
|
||||
*zap_thread = zmq_threadstart (zap_handler_, *ctx);
|
||||
|
||||
char *buf = s_recv (*zap_control);
|
||||
assert (buf);
|
||||
assert (streq (buf, "GO"));
|
||||
free (buf);
|
||||
} else
|
||||
*zap_thread = NULL;
|
||||
|
||||
// Server socket will accept connections
|
||||
*server = zmq_socket (*ctx, ZMQ_DEALER);
|
||||
assert (*server);
|
||||
|
||||
socket_config_ (*server, socket_config_data_);
|
||||
|
||||
rc = zmq_setsockopt (*server, ZMQ_IDENTITY, identity, strlen (identity));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (*server, "tcp://127.0.0.1:*");
|
||||
assert (rc == 0);
|
||||
|
||||
size_t len = MAX_SOCKET_STRING;
|
||||
rc = zmq_getsockopt (*server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
|
||||
assert (rc == 0);
|
||||
|
||||
const char server_monitor_endpoint [] = "inproc://monitor-server";
|
||||
setup_handshake_socket_monitor (*ctx, *server, server_mon,
|
||||
server_monitor_endpoint);
|
||||
}
|
||||
|
||||
void shutdown_context_and_server_side (void *ctx,
|
||||
void *zap_thread,
|
||||
void *server,
|
||||
void *server_mon,
|
||||
void *zap_control,
|
||||
bool zap_handler_stopped = false)
|
||||
{
|
||||
if (zap_thread && !zap_handler_stopped) {
|
||||
int rc = s_send (zap_control, "STOP");
|
||||
assert (rc == 4);
|
||||
char *buf = s_recv (zap_control);
|
||||
assert (buf);
|
||||
assert (streq (buf, "STOPPED"));
|
||||
free (buf);
|
||||
rc = zmq_unbind (zap_control, "inproc://handler-control");
|
||||
assert (rc == 0);
|
||||
}
|
||||
close_zero_linger(zap_control);
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
close_zero_linger (server_mon);
|
||||
#endif
|
||||
close_zero_linger (server);
|
||||
|
||||
// Wait until ZAP handler terminates
|
||||
if (zap_thread)
|
||||
zmq_threadclose (zap_thread);
|
||||
|
||||
int rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
zmq_atomic_counter_destroy (&zap_requests_handled);
|
||||
}
|
||||
|
||||
void *create_and_connect_client (void *ctx,
|
||||
char *my_endpoint,
|
||||
socket_config_fn socket_config_,
|
||||
void *socket_config_data_,
|
||||
void **client_mon = NULL)
|
||||
{
|
||||
void *client = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (client);
|
||||
|
||||
socket_config_ (client, socket_config_data_);
|
||||
|
||||
int rc = zmq_connect (client, my_endpoint);
|
||||
assert (rc == 0);
|
||||
|
||||
if (client_mon) {
|
||||
setup_handshake_socket_monitor (ctx, client, client_mon,
|
||||
"inproc://client-monitor");
|
||||
}
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
void expect_new_client_bounce_fail (void *ctx,
|
||||
char *my_endpoint,
|
||||
void *server,
|
||||
socket_config_fn socket_config_,
|
||||
void *socket_config_data_,
|
||||
void **client_mon = NULL,
|
||||
int expected_client_event = 0,
|
||||
int expected_client_value = 0)
|
||||
{
|
||||
void *my_client_mon;
|
||||
assert (client_mon == NULL || expected_client_event == 0);
|
||||
if (expected_client_event != 0)
|
||||
client_mon = &my_client_mon;
|
||||
void *client = create_and_connect_client (ctx, my_endpoint, socket_config_,
|
||||
socket_config_data_, client_mon);
|
||||
expect_bounce_fail (server, client);
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
if (expected_client_event != 0) {
|
||||
int events_received = 0;
|
||||
events_received = expect_monitor_event_multiple (
|
||||
my_client_mon, expected_client_event, expected_client_value, false);
|
||||
|
||||
assert (events_received == 1);
|
||||
|
||||
int rc = zmq_close (my_client_mon);
|
||||
assert (rc == 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
close_zero_linger (client);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
Loading…
x
Reference in New Issue
Block a user