From 9c6738bb450d6edc03b49710618cb89f36e25fb1 Mon Sep 17 00:00:00 2001 From: Gudmundur Adalsteinsson Date: Fri, 17 Apr 2020 18:37:01 +0000 Subject: [PATCH] Problem: No support to query poller size Solution: Add zmq_poller_size that queries the number of objects registered, allowing safer usages of poller to avoid livelock situations. --- doc/zmq_poller.txt | 18 +++++++++++++-- include/zmq.h | 1 + src/zmq.cpp | 8 +++++++ src/zmq_draft.h | 1 + tests/test_poller.cpp | 51 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 77 insertions(+), 2 deletions(-) diff --git a/doc/zmq_poller.txt b/doc/zmq_poller.txt index a2b076d5..e9e49452 100644 --- a/doc/zmq_poller.txt +++ b/doc/zmq_poller.txt @@ -14,6 +14,8 @@ SYNOPSIS *int zmq_poller_destroy (void ****'poller_p');* +*int zmq_poller_size (void *'poller');* + *int zmq_poller_add (void *'poller', void *'socket', void *'user_data', short 'events');* *int zmq_poller_modify (void *'poller', void *'socket', short 'events');* @@ -51,6 +53,11 @@ instance. _zmq_poller_destroy_ sets the passed pointer to NULL in case of a successful execution. _zmq_poller_destroy_ implicitly unregisters all registered sockets and file descriptors. +_zmq_poller_size_ queries the number of sockets or file descriptors registered +with a poller. The initial size of a poller is 0, a successful add operation +increases the size by 1 and a successful remove operation decreases the size +by 1. The size is unaffected by the events specified. + _zmq_poller_add_, _zmq_poller_modify_ and _zmq_poller_remove_ manage the 0MQ sockets registered with a poller. @@ -129,7 +136,8 @@ registered objects. Otherwise, a livelock situation may result: If more than 'n_events' registered objects have an active event on each call to _zmq_poller_wait_all_, it might happen that the same subset of registered objects is always returned, and the caller never notices the events on the -others. +others. The number of objects registered can be queried with +_zmq_poller_size_. _zmq_poller_wait_all_ returns the number of valid elements. The valid elements are placed in positions '0' to 'n_events - 1' in the 'events' array. All @@ -219,6 +227,12 @@ On _zmq_poller_destroy_: _poller_p_ did not point to a valid poller. Note that passing an invalid pointer (e.g. pointer to deallocated memory) may cause undefined behaviour (e.g. an access violation). +On _zmq_poller_size_: +*EFAULT*:: +_poller_ did not point to a valid poller. Note that passing an +invalid pointer (e.g. pointer to deallocated memory) may cause undefined +behaviour (e.g. an access violation). + On _zmq_poller_add_, _zmq_poller_modify_ and _zmq_poller_remove_: *EFAULT*:: _poller_ did not point to a valid poller. Note that passing an @@ -264,7 +278,7 @@ available. *EAGAIN*:: No registered event was signalled before the timeout was reached. -On _zmq_poller_fd: +On _zmq_poller_fd_: *EINVAL*:: The poller has no associated file descriptor. *EFAULT*:: diff --git a/include/zmq.h b/include/zmq.h index d7b33b4d..ab5fae98 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -735,6 +735,7 @@ typedef struct zmq_poller_event_t ZMQ_EXPORT void *zmq_poller_new (void); ZMQ_EXPORT int zmq_poller_destroy (void **poller_p); +ZMQ_EXPORT int zmq_poller_size (void *poller); ZMQ_EXPORT int zmq_poller_add (void *poller, void *socket, void *user_data, short events); ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events); diff --git a/src/zmq.cpp b/src/zmq.cpp index 4da72698..1c1a525b 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1219,6 +1219,14 @@ static int check_poller_fd_registration_args (void *const poller_, return 0; } +int zmq_poller_size (void *poller_) +{ + if (-1 == check_poller (poller_)) + return -1; + + return (static_cast (poller_))->size (); +} + int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_) { if (-1 == check_poller_registration_args (poller_, s_) diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 5c612ca0..fc17cc78 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -124,6 +124,7 @@ typedef struct zmq_poller_event_t void *zmq_poller_new (void); int zmq_poller_destroy (void **poller_p_); +int zmq_poller_size (void *poller_); int zmq_poller_add (void *poller_, void *socket_, void *user_data_, diff --git a/tests/test_poller.cpp b/tests/test_poller.cpp index 24d13d57..00cb90e5 100644 --- a/tests/test_poller.cpp +++ b/tests/test_poller.cpp @@ -60,6 +60,17 @@ void test_null_poller_pointers_destroy_indirect () TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_poller_destroy (&null_poller)); } +void test_null_poller_pointers_size_direct () +{ + TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_poller_size (NULL)); +} + +void test_null_poller_pointers_size_indirect () +{ + void *null_poller = NULL; + TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_poller_size (&null_poller)); +} + void test_null_poller_pointers_add_direct () { void *socket = test_context_socket (ZMQ_PAIR); @@ -330,6 +341,42 @@ TEST_CASE_FUNC_PARAM (call_poller_wait_all_null_event_fails_event_count_nonzero, TEST_CASE_FUNC_PARAM (call_poller_wait_all_null_event_fails_event_count_zero, test_with_valid_poller) +void call_poller_size (void *poller_, void *socket_) +{ + int rc = zmq_poller_size (poller_); + TEST_ASSERT_SUCCESS_ERRNO (rc); + TEST_ASSERT_EQUAL (rc, 0); + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_poller_add (poller_, socket_, NULL, ZMQ_POLLIN)); + rc = zmq_poller_size (poller_); + TEST_ASSERT_SUCCESS_ERRNO (rc); + TEST_ASSERT_EQUAL (rc, 1); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_poller_modify (poller_, socket_, 0)); + rc = zmq_poller_size (poller_); + TEST_ASSERT_SUCCESS_ERRNO (rc); + TEST_ASSERT_EQUAL (rc, 1); + + fd_t plain_socket = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_poller_add_fd (poller_, plain_socket, NULL, ZMQ_POLLOUT)); + rc = zmq_poller_size (poller_); + TEST_ASSERT_SUCCESS_ERRNO (rc); + TEST_ASSERT_EQUAL (rc, 2); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_poller_remove (poller_, socket_)); + rc = zmq_poller_size (poller_); + TEST_ASSERT_SUCCESS_ERRNO (rc); + TEST_ASSERT_EQUAL (rc, 1); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_poller_remove_fd (poller_, plain_socket)); + TEST_ASSERT_SUCCESS_ERRNO (close (plain_socket)); + rc = zmq_poller_size (poller_); + TEST_ASSERT_SUCCESS_ERRNO (rc); + TEST_ASSERT_EQUAL (rc, 0); +} + void call_poller_add_twice_fails (void *poller_, void *socket_) { TEST_ASSERT_SUCCESS_ERRNO ( @@ -449,6 +496,7 @@ void call_poller_modify_fd_invalid_events_fails (void *poller_, TEST_ASSERT_SUCCESS_ERRNO (close (plain_socket)); } +TEST_CASE_FUNC_PARAM (call_poller_size, test_with_empty_poller) TEST_CASE_FUNC_PARAM (call_poller_add_twice_fails, test_with_empty_poller) TEST_CASE_FUNC_PARAM (call_poller_remove_unregistered_fails, test_with_empty_poller) @@ -666,6 +714,8 @@ int main (void) UNITY_BEGIN (); RUN_TEST (test_null_poller_pointers_destroy_direct); RUN_TEST (test_null_poller_pointers_destroy_indirect); + RUN_TEST (test_null_poller_pointers_size_direct); + RUN_TEST (test_null_poller_pointers_size_indirect); RUN_TEST (test_null_poller_pointers_add_direct); RUN_TEST (test_null_poller_pointers_add_indirect); RUN_TEST (test_null_poller_pointers_modify_direct); @@ -690,6 +740,7 @@ int main (void) RUN_TEST (test_call_poller_wait_all_null_event_fails_event_count_nonzero); RUN_TEST (test_call_poller_wait_all_null_event_fails_event_count_zero); + RUN_TEST (test_call_poller_size); RUN_TEST (test_call_poller_add_twice_fails); RUN_TEST (test_call_poller_remove_unregistered_fails); RUN_TEST (test_call_poller_modify_unregistered_fails);