diff --git a/include/zmq.h b/include/zmq.h index 62318fbd..dceccb31 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -661,6 +661,10 @@ ZMQ_EXPORT int zmq_poller_modify_fd (void *poller, int fd, short events); ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd); #endif +ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket, + const void *identity, + size_t identity_size); + /******************************************************************************/ /* Scheduling timers */ /******************************************************************************/ diff --git a/src/zmq.cpp b/src/zmq.cpp index dca9088c..d4f3efba 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1389,6 +1389,16 @@ int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_event return rc; } +// Peer-specific state + +int zmq_socket_get_peer_state (void *socket, + const void *identity, + size_t identity_size) +{ + errno = ENOTSUP; + return -1; +} + // Timers void *zmq_timers_new (void) diff --git a/tests/test_router_mandatory.cpp b/tests/test_router_mandatory.cpp index 1388a4f0..36cdb42d 100644 --- a/tests/test_router_mandatory.cpp +++ b/tests/test_router_mandatory.cpp @@ -29,6 +29,107 @@ #include "testutil.hpp" +void test_get_peer_state () +{ +#ifdef ZMQ_BUILD_DRAFT_API + size_t len = MAX_SOCKET_STRING; + char my_endpoint[MAX_SOCKET_STRING]; + void *ctx = zmq_ctx_new (); + assert (ctx); + void *router = zmq_socket (ctx, ZMQ_ROUTER); + assert (router); + + int rc = zmq_bind (router, "tcp://127.0.0.1:*"); + assert (rc == 0); + + rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len); + assert (rc == 0); + + int mandatory = 1; + rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory, + sizeof (mandatory)); + + // Create dealer called "X" and connect it to our router + void *dealer1 = zmq_socket (ctx, ZMQ_DEALER); + assert (dealer1); + rc = zmq_setsockopt (dealer1, ZMQ_IDENTITY, "X", 1); + assert (rc == 0); + rc = zmq_connect (dealer1, my_endpoint); + assert (rc == 0); + + // Create dealer called "Y" and connect it to our router + void *dealer2 = zmq_socket (ctx, ZMQ_DEALER); + assert (dealer2); + rc = zmq_setsockopt (dealer2, ZMQ_IDENTITY, "Y", 1); + assert (rc == 0); + rc = zmq_connect (dealer2, my_endpoint); + assert (rc == 0); + + // Get message from dealer to know when connection is ready + char buffer[255]; + rc = zmq_send (dealer1, "Hello", 5, 0); + assert (rc == 5); + rc = zmq_recv (router, buffer, 255, 0); + assert (rc == 1); + assert (buffer[0] == 'X'); + + void *poller = zmq_poller_new (); + assert (poller); + + // Poll on router and dealer1, but not on dealer2 + rc = zmq_poller_add (poller, router, NULL, ZMQ_POLLOUT); + assert (rc == 0); + rc = zmq_poller_add (poller, dealer1, NULL, ZMQ_POLLIN); + assert (rc == 0); + + const size_t count = 10000; + const size_t event_size = 2; + zmq_poller_event_t events[event_size]; + for (size_t iterations = 0; + iterations < count + && zmq_poller_wait_all (poller, events, event_size, -1) != -1; + ++iterations) { + for (size_t i = 0; i < event_size; ++i) { + if (events[i].socket == router) { + rc = zmq_socket_get_peer_state (router, "X", 1); + assert (rc != -1); + if (rc & ZMQ_POLLOUT) { + rc = zmq_send (router, "X", 1, ZMQ_SNDMORE); + assert (rc == 1); + rc = zmq_send (router, "Hello", 5, 0); + assert (rc == 5); + } + rc = zmq_socket_get_peer_state (router, "Y", 1); + assert (rc != -1); + if (rc & ZMQ_POLLOUT) { + rc = zmq_send (router, "Y", 1, ZMQ_SNDMORE); + assert (rc == 1); + rc = zmq_send (router, "Hello", 5, 0); + assert (rc == 5); + } + } + if (events[i].socket == dealer1) { + rc = zmq_recv (dealer1, buffer, 255, ZMQ_DONTWAIT); + assert (rc == 5); + } + // never read from dealer2, so its pipe becomes full eventually + } + } + + rc = zmq_close (router); + assert (rc == 0); + + rc = zmq_close (dealer1); + assert (rc == 0); + + rc = zmq_close (dealer2); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +#endif +} + void test_basic () { size_t len = MAX_SOCKET_STRING; @@ -98,6 +199,7 @@ int main (void) setup_test_environment (); test_basic (); + test_get_peer_state (); - return 0; + return 0 ; }