From 759d453368479257638e6b09e1febe19fbef2a3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Fri, 21 Sep 2012 12:53:31 +0100 Subject: [PATCH] Significantly reworked the monitoring infrastructure with a more granular per socket API and to play well with monitoring endpoints in application threads --- doc/Makefile.am | 4 +- doc/zmq_ctx_set_monitor.txt | 223 --------------------------- doc/zmq_socket_monitor.txt | 288 +++++++++++++++++++++++++++++++++++ include/zmq.h | 19 ++- src/ctx.cpp | 70 +-------- src/ctx.hpp | 8 - src/ipc_connecter.cpp | 9 +- src/ipc_connecter.hpp | 3 + src/ipc_listener.cpp | 10 +- src/session_base.cpp | 13 +- src/session_base.hpp | 4 +- src/socket_base.cpp | 181 ++++++++++++++++++++-- src/socket_base.hpp | 26 +++- src/stream_engine.cpp | 6 +- src/stream_engine.hpp | 4 + src/tcp_connecter.cpp | 9 +- src/tcp_connecter.hpp | 3 + src/tcp_listener.cpp | 8 +- src/zmq.cpp | 20 +-- tests/test_monitor.cpp | 292 +++++++++++++++++++++++++++--------- 20 files changed, 768 insertions(+), 432 deletions(-) delete mode 100644 doc/zmq_ctx_set_monitor.txt create mode 100644 doc/zmq_socket_monitor.txt diff --git a/doc/Makefile.am b/doc/Makefile.am index 849f3b67..f2683f86 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -1,11 +1,11 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_proxy.3 \ zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \ - zmq_init.3 zmq_term.3 zmq_ctx_set_monitor.3\ + zmq_init.3 zmq_term.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_send.3 zmq_msg_recv.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ - zmq_strerror.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ + zmq_socket_monitor.3 zmq_strerror.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 diff --git a/doc/zmq_ctx_set_monitor.txt b/doc/zmq_ctx_set_monitor.txt deleted file mode 100644 index df688d85..00000000 --- a/doc/zmq_ctx_set_monitor.txt +++ /dev/null @@ -1,223 +0,0 @@ -zmq_ctx_set_monitor(3) -====================== - - -NAME ----- - -zmq_ctx_set_monitor - register a monitoring callback - - -SYNOPSIS --------- -*int zmq_ctx_set_monitor (void '*context', zmq_monitor_fn '*monitor');* - - -DESCRIPTION ------------ -The _zmq_ctx_set_monitor()_ function shall register a callback function -specified by the 'monitor' argument. This is an event sink for changes in per -socket connection and mailbox (work in progress) states. - -.The _zmq_ctx_set_monitor()_ callback function is expected to have this -prototype: - ----- -typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data); ----- - -The callback is global (per context), with the socket that triggered the event -passed to the handler as well. Each event also populates a 'zmq_event_data_t' -union with additional metadata which can be used for correlation. - -CAUTION: _zmq_ctx_set_monitor()_ is intended for monitoring infrastructure / -operations concerns only - NOT BUSINESS LOGIC. An event is a representation of -something that happened - you cannot change the past, but only react to them. -The implementation is also only concerned with a single session. No state of -peers, other sessions etc. are tracked - this will only pollute internals and -is the responsibility of application authors to either implement or correlate -in another datastore. Monitor events are exceptional conditions and are thus -not directly in the messaging critical path. However, still be careful with -what you're doing in the callback function as excess time spent in the handler -will block the socket's application thread. - -Only tcp and ipc specific transport events are supported in this initial -implementation. - -Supported events are: - - -ZMQ_EVENT_CONNECTED: connection established -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established -to a remote peer. This can happen either synchronous or asynchronous. - -.Callback metadata: ----- -data.connected.addr // peer address -data.connected.fd // socket descriptor ----- - -ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection -attempt is delayed and it's completion's being polled for. - -.Callback metadata: ----- -data.connect_delayed.addr // peer address -data.connect_delayed.err // errno ----- - -ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt -is being handled by reconnect timer. The reconnect interval's recomputed -for each attempt. - -.Callback metadata: ----- -data.connect_retried.addr // peer address -data.connect_retried.interval // computed reconnect interval ----- - -ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound -to a an interface. - -.Callback metadata: ----- -data.listening.addr // listen address -data.listening.fd // socket descriptor ----- - -ZMQ_EVENT_BIND_FAILED: socket could not bind to an address -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to -a given interface. - -.Callback metadata: ----- -data.bind_failed.addr // listen address -data.bind_failed.err // errno ----- - -ZMQ_EVENT_ACCEPTED: connection accepted to bound interface -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer -has been established with a socket's listen address. - -.Callback metadata: ----- -data.accepted.addr // listen address -data.accepted.fd // socket descriptor ----- - -ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to -a socket's bound address fails. - -.Callback metadata: ----- -data.accept_failed.addr // listen address -data.accept_failed.err // errno ----- - -ZMQ_EVENT_CLOSED: connection closed -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor -has been closed. - -.Callback metadata: ----- -data.closed.addr // address -data.closed.fd // socket descriptor ----- - -ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be -released back to the OS. - -.Callback metadata: ----- -data.close_failed.addr // address -data.close_failed.err // errno ----- - -ZMQ_EVENT_DISCONNECTED: broken session -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc -specific) detects a corrupted / broken session. - -.Callback metadata: ----- -data.disconnected.addr // address -data.disconnected.fd // socket descriptor ----- - -RETURN VALUE ------------- -The _zmq_ctx_set_monitor()_ function returns a value of 0 or greater if -successful. Otherwise it returns `-1` and sets 'errno' to one of the values -defined below. - - -ERRORS ------- -*EINVAL*:: -The requested callback function _monitor_ is invalid. - - -EXAMPLE -------- -.Observing a 'PUB' socket's connection state ----- -void socket_monitor (void *s, int event_, zmq_event_data_t *data_) -{ - switch (event_) { - case ZMQ_EVENT_LISTENING: - printf ("Socket bound to %s, socket descriptor is %d\n", - data.listening.addr, data.listening.fd); - break; - case ZMQ_EVENT_ACCEPTED: - printf ("Accepted connection to %s, socket descriptor is %d\n", - data.accepted.addr, data.accepted.fd); - break; - } -} - -void *context = zmq_ctx_new (); -int rc = zmq_ctx_set_monitor (context, socket_monitor); -assert (rc == 0); -void *pub = zmq_socket (context, ZMQ_PUB); -assert (pub); -void *sub = zmq_socket (context, ZMQ_SUB); -assert (pub); -rc = zmq_bind (pub, "tcp://127.0.0.1:5560"); -assert (rc == 0); -rc = zmq_connect (sub, "tcp://127.0.0.1:5560"); -assert (rc == 0); - -// Allow a window for socket events as connect can be async -zmq_sleep (1); - -rc = zmq_close (pub); -assert (rc == 0); -rc = zmq_close (sub); -assert (rc == 0); - -zmq_term (context); ----- - - -SEE ALSO --------- -linkzmq:zmq[7] - - -AUTHORS -------- -This 0MQ manual page was written by Lourens Naudé \ No newline at end of file diff --git a/doc/zmq_socket_monitor.txt b/doc/zmq_socket_monitor.txt new file mode 100644 index 00000000..f289c908 --- /dev/null +++ b/doc/zmq_socket_monitor.txt @@ -0,0 +1,288 @@ +zmq_ctx_socket_monitor(3) +========================= + + +NAME +---- + +zmq_socket_monitor - register a monitoring callback + + +SYNOPSIS +-------- +*int zmq_socket_monitor (void '*socket', char * '*addr', int 'events');* + + +DESCRIPTION +----------- +The _zmq_socket_monitor()_ function shall spawn a 'PAIR' socket that publishes +socket state changes (events) over the inproc:// transport to a given endpoint. +Messages are 'zmq_event_t' structs. It's recommended to connect via a 'PAIR' +socket in another application thread and handle monitoring events there. It's +possible to also supply a bitmask ('ZMQ_EVENT_ALL' or any combination of the +'ZMQ_EVENT_*' constants) of the events you're interested in. + +---- +// monitoring thread +static void *req_socket_monitor (void *ctx) +{ + zmq_event_t event; + int rc; + + void *s = zmq_socket (ctx, ZMQ_PAIR); + assert (s); + + rc = zmq_connect (s, "inproc://monitor.req"); + assert (rc == 0); + while (true) { + zmq_msg_t msg; + zmq_msg_init (&msg); + rc = zmq_recvmsg (s, &msg, 0); + if (rc == -1 && zmq_errno() == ETERM) break; + assert (rc != -1); + memcpy (&event, zmq_msg_data (&msg), sizeof (event)); + switch (event.event) { + case ZMQ_EVENT_CONNECTED: + // handle socket connected event + break; + case ZMQ_EVENT_CLOSED: + // handle socket closed event + break; + } + } + zmq_close (s); + return NULL; +} + +// register a monitor endpoint for all socket events +rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); +assert (rc == 0); + +// spawn a monitoring thread +rc = pthread_create (&threads [0], NULL, req_socket_monitor, ctx); +assert (rc == 0); +---- + +Only connection oriented (tcp and ipc) transports are supported in this initial +implementation. + +Supported events are: + + +ZMQ_EVENT_CONNECTED: connection established +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established +to a remote peer. This can happen either synchronous or asynchronous. + +.Event metadata: +---- +data.connected.addr // peer address +data.connected.fd // socket descriptor +---- + +ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection +attempt is delayed and it's completion's being polled for. + +.Event metadata: +---- +data.connect_delayed.addr // peer address +data.connect_delayed.err // errno value +---- + +ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt +is being handled by reconnect timer. The reconnect interval's recomputed +for each attempt. + +.Event metadata: +---- +data.connect_retried.addr // peer address +data.connect_retried.interval // computed reconnect interval +---- + +ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound +to a an interface. + +.Event metadata: +---- +data.listening.addr // listen address +data.listening.fd // socket descriptor +---- + +ZMQ_EVENT_BIND_FAILED: socket could not bind to an address +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to +a given interface. + +.Event metadata: +---- +data.bind_failed.addr // listen address +data.bind_failed.err // errno value +---- + +ZMQ_EVENT_ACCEPTED: connection accepted to bound interface +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer +has been established with a socket's listen address. + +.Event metadata: +---- +data.accepted.addr // listen address +data.accepted.fd // socket descriptor +---- + +ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to +a socket's bound address fails. + +.Event metadata: +---- +data.accept_failed.addr // listen address +data.accept_failed.err // errno value +---- + +ZMQ_EVENT_CLOSED: connection closed +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor +has been closed. + +.Event metadata: +---- +data.closed.addr // address +data.closed.fd // socket descriptor +---- + +ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be +released back to the OS. + +.Event metadata: +---- +data.close_failed.addr // address +data.close_failed.err // errno value +---- + +ZMQ_EVENT_DISCONNECTED: broken session +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc +specific) detects a corrupted / broken session. + +.Event metadata: +---- +data.disconnected.addr // address +data.disconnected.fd // socket descriptor +---- + +RETURN VALUE +------------ +The _zmq_socket_monitor()_ function returns a value of 0 or greater if +successful. Otherwise it returns `-1` and sets 'errno' to one of the values +defined below. + + +ERRORS +------ +*ETERM*:: +The 0MQ 'context' associated with the specified 'socket' was terminated. + +*EPROTONOSUPPORT*:: +The requested 'transport' protocol is not supported. Monitor sockets are +required to use the inproc:// transport. + +*EINVAL*:: +The endpoint supplied is invalid. + +EXAMPLE +------- +.Observing a 'REP' socket's connection state +---- +// REP socket monitor thread +static void *rep_socket_monitor (void *ctx) +{ + zmq_event_t event; + int rc; + + void *s = zmq_socket (ctx, ZMQ_PAIR); + assert (s); + + rc = zmq_connect (s, "inproc://monitor.rep"); + assert (rc == 0); + while (true) { + zmq_msg_t msg; + zmq_msg_init (&msg); + rc = zmq_recvmsg (s, &msg, 0); + if (rc == -1 && zmq_errno() == ETERM) break; + assert (rc != -1); + memcpy (&event, zmq_msg_data (&msg), sizeof (event)); + switch (event.event) { + case ZMQ_EVENT_LISTENING: + printf ("listening socket descriptor %d\n", event.data.listening.fd); + printf ("listening socket address %s\n", event.data.listening.addr); + break; + case ZMQ_EVENT_ACCEPTED: + printf ("accepted socket descriptor %d\n", event.data.accepted.fd); + printf ("accepted socket address %s\n", event.data.accepted.addr); + break; + case ZMQ_EVENT_CLOSE_FAILED: + printf ("socket close failure error code %d\n", event.data.close_failed.err); + printf ("socket address %s\n", event.data.close_failed.addr); + break; + case ZMQ_EVENT_CLOSED: + printf ("closed socket descriptor %d\n", event.data.closed.fd); + printf ("closed socket address %s\n", event.data.closed.addr); + break; + case ZMQ_EVENT_DISCONNECTED: + printf ("disconnected socket descriptor %d\n", event.data.disconnected.fd); + printf ("disconnected socket address %s\n", event.data.disconnected.addr); + break; + } + zmq_msg_close (&msg); + } + zmq_close (s); + return NULL; +} + + +// Create the infrastructure +void *ctx = zmq_init (1); +assert (ctx); + +// REP socket +rep = zmq_socket (ctx, ZMQ_REP); +assert (rep); + +// REP socket monitor, all events +rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); +assert (rc == 0); +rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx); +assert (rc == 0); + +rc = zmq_bind (rep, addr); +assert (rc == 0); + +// Allow some time for event detection +zmq_sleep (1); + +// Close the REP socket +rc = zmq_close (rep); +assert (rc == 0); + +zmq_term (ctx); +---- + + +SEE ALSO +-------- +linkzmq:zmq[7] + + +AUTHORS +------- +This 0MQ manual page was written by Lourens Naudé \ No newline at end of file diff --git a/include/zmq.h b/include/zmq.h index 3327b6bf..372e96c1 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -278,8 +278,16 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_EVENT_CLOSE_FAILED 256 #define ZMQ_EVENT_DISCONNECTED 512 +#define ZMQ_EVENT_ALL ( ZMQ_EVENT_CONNECTED | ZMQ_EVENT_CONNECT_DELAYED | \ + ZMQ_EVENT_CONNECT_RETRIED | ZMQ_EVENT_LISTENING | \ + ZMQ_EVENT_BIND_FAILED | ZMQ_EVENT_ACCEPTED | \ + ZMQ_EVENT_ACCEPT_FAILED | ZMQ_EVENT_CLOSED | \ + ZMQ_EVENT_CLOSE_FAILED | ZMQ_EVENT_DISCONNECTED ) + /* Socket event data (union member per event) */ -typedef union { +typedef struct { + int event; + union { struct { char *addr; int fd; @@ -320,12 +328,8 @@ typedef union { char *addr; int fd; } disconnected; -} zmq_event_data_t; - -/* Callback template for socket state changes */ -typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data); - -ZMQ_EXPORT int zmq_ctx_set_monitor (void *context, zmq_monitor_fn *monitor); + } data; +} zmq_event_t; ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT int zmq_close (void *s); @@ -339,6 +343,7 @@ ZMQ_EXPORT int zmq_unbind (void *s, const char *addr); ZMQ_EXPORT int zmq_disconnect (void *s, const char *addr); ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); +ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events); ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags); diff --git a/src/ctx.cpp b/src/ctx.cpp index b548577e..552ffec6 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -45,8 +45,7 @@ zmq::ctx_t::ctx_t () : slot_count (0), slots (NULL), max_sockets (ZMQ_MAX_SOCKETS_DFLT), - io_thread_count (ZMQ_IO_THREADS_DFLT), - monitor_fn (NULL) + io_thread_count (ZMQ_IO_THREADS_DFLT) { } @@ -126,12 +125,6 @@ int zmq::ctx_t::terminate () return 0; } -int zmq::ctx_t::monitor (zmq_monitor_fn *monitor_) -{ - monitor_fn = monitor_; - return 0; -} - int zmq::ctx_t::set (int option_, int optval_) { int rc = 0; @@ -353,67 +346,6 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) return endpoint; } -void zmq::ctx_t::monitor_event (zmq::socket_base_t *socket_, int event_, ...) -{ - va_list args; - va_start (args, event_); - va_monitor_event (socket_, event_, args); - va_end (args); -} - -void zmq::ctx_t::va_monitor_event (zmq::socket_base_t *socket_, int event_, va_list args_) -{ - if (monitor_fn != NULL) { - zmq_event_data_t data; - memset(&data, 0, sizeof (zmq_event_data_t)); - switch (event_) { - case ZMQ_EVENT_CONNECTED: - data.connected.addr = va_arg (args_, char*); - data.connected.fd = va_arg (args_, int); - break; - case ZMQ_EVENT_CONNECT_DELAYED: - data.connect_delayed.addr = va_arg (args_, char*); - data.connect_delayed.err = va_arg (args_, int); - break; - case ZMQ_EVENT_CONNECT_RETRIED: - data.connect_retried.addr = va_arg (args_, char*); - data.connect_retried.interval = va_arg (args_, int); - break; - case ZMQ_EVENT_LISTENING: - data.listening.addr = va_arg (args_, char*); - data.listening.fd = va_arg (args_, int); - break; - case ZMQ_EVENT_BIND_FAILED: - data.bind_failed.addr = va_arg (args_, char*); - data.bind_failed.err = va_arg (args_, int); - break; - case ZMQ_EVENT_ACCEPTED: - data.accepted.addr = va_arg (args_, char*); - data.accepted.fd = va_arg (args_, int); - break; - case ZMQ_EVENT_ACCEPT_FAILED: - data.accept_failed.addr = va_arg (args_, char*); - data.accept_failed.err = va_arg (args_, int); - break; - case ZMQ_EVENT_CLOSED: - data.closed.addr = va_arg (args_, char*); - data.closed.fd = va_arg (args_, int); - break; - case ZMQ_EVENT_CLOSE_FAILED: - data.close_failed.addr = va_arg (args_, char*); - data.close_failed.err = va_arg (args_, int); - break; - case ZMQ_EVENT_DISCONNECTED: - data.disconnected.addr = va_arg (args_, char*); - data.disconnected.fd = va_arg (args_, int); - break; - default: - zmq_assert (false); - } - monitor_fn ((void *)socket_, event_, &data); - } -} - // The last used socket ID, or 0 if no socket was used so far. Note that this // is a global variable. Thus, even sockets created in different contexts have // unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index 2aba8057..43e3237e 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -95,11 +95,6 @@ namespace zmq void unregister_endpoints (zmq::socket_base_t *socket_); endpoint_t find_endpoint (const char *addr_); - // Monitoring specific - int monitor (zmq_monitor_fn *monitor_); - void monitor_event (zmq::socket_base_t *socket_, int event_, ...); - void va_monitor_event (zmq::socket_base_t *socket_, int event_, va_list args_); - enum { term_tid = 0, reaper_tid = 1 @@ -169,9 +164,6 @@ namespace zmq // Synchronisation of access to context options. mutex_t opt_sync; - // Monitoring callback - zmq_monitor_fn *monitor_fn; - ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index a6b47d33..8d8b1bc9 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -56,6 +56,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, zmq_assert (addr); zmq_assert (addr->protocol == "ipc"); addr->to_string (endpoint); + socket = session-> get_socket(); } zmq::ipc_connecter_t::~ipc_connecter_t () @@ -121,7 +122,7 @@ void zmq::ipc_connecter_t::out_event () // Shut the connecter down. terminate (); - session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd); + socket->event_connected (endpoint.c_str(), fd); } void zmq::ipc_connecter_t::timer_event (int id_) @@ -148,7 +149,7 @@ void zmq::ipc_connecter_t::start_connecting () handle = add_fd (s); handle_valid = true; set_pollout (handle); - session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno()); + socket->event_connect_delayed (endpoint.c_str(), zmq_errno()); } // Handle any other error condition by eventual reconnect. @@ -163,7 +164,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer() { int rc_ivl = get_new_reconnect_ivl(); add_timer (rc_ivl, reconnect_timer_id); - session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl); + socket->event_connect_retried (endpoint.c_str(), rc_ivl); timer_started = true; } @@ -224,7 +225,7 @@ int zmq::ipc_connecter_t::close () zmq_assert (s != retired_fd); int rc = ::close (s); errno_assert (rc == 0); - session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); + socket->event_closed (endpoint.c_str(), s); s = retired_fd; return 0; } diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 5ff45b03..39d2c822 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -113,6 +113,9 @@ namespace zmq // String representation of endpoint to connect to std::string endpoint; + // Socket + zmq::socket_base_t *socket; + ipc_connecter_t (const ipc_connecter_t&); const ipc_connecter_t &operator = (const ipc_connecter_t&); }; diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 13e8af9c..92945c27 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -76,7 +76,7 @@ void zmq::ipc_listener_t::in_event () // If connection was reset by the peer in the meantime, just ignore it. // TODO: Handle specific errors like ENFILE/EMFILE etc. if (fd == retired_fd) { - socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno()); + socket->event_accept_failed (endpoint.c_str(), zmq_errno()); return; } @@ -96,7 +96,7 @@ void zmq::ipc_listener_t::in_event () session->inc_seqnum (); launch_child (session); send_attach (session, engine, false); - socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd); + socket->event_accepted (endpoint.c_str(), fd); } int zmq::ipc_listener_t::get_address (std::string &addr_) @@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_) if (rc != 0) goto error; - socket->monitor_event (ZMQ_EVENT_LISTENING, endpoint.c_str(), s); + socket->event_listening (endpoint.c_str(), s); return 0; error: @@ -178,12 +178,12 @@ int zmq::ipc_listener_t::close () if (has_file && !filename.empty ()) { rc = ::unlink(filename.c_str ()); if (rc != 0) { - socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno()); + socket->event_close_failed (endpoint.c_str(), zmq_errno()); return -1; } } - socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); + socket->event_closed (endpoint.c_str(), s); return 0; } diff --git a/src/session_base.cpp b/src/session_base.cpp index acbfd489..73a018e1 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -23,7 +23,6 @@ #include #include "session_base.hpp" -#include "socket_base.hpp" #include "i_engine.hpp" #include "err.hpp" #include "pipe.hpp" @@ -286,17 +285,9 @@ void zmq::session_base_t::hiccuped (pipe_t *) zmq_assert (false); } -void zmq::session_base_t::monitor_event (int event_, ...) +zmq::socket_base_t *zmq::session_base_t::get_socket () { - va_list args; - va_start (args, event_); - va_monitor_event (event_, args); - va_end (args); -} - -void zmq::session_base_t::va_monitor_event (int event_, va_list args) -{ - socket->va_monitor_event (event_, args); + return socket; } void zmq::session_base_t::process_plug () diff --git a/src/session_base.hpp b/src/session_base.hpp index 5b5fd2b4..7f8977fb 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -31,6 +31,7 @@ #include "pipe.hpp" #include "i_msg_source.hpp" #include "i_msg_sink.hpp" +#include "socket_base.hpp" namespace zmq { @@ -75,8 +76,7 @@ namespace zmq void hiccuped (zmq::pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_); - void monitor_event (int event_, ...); - void va_monitor_event (int event_, va_list args); + socket_base_t *get_socket (); protected: diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 586c58dc..0e06bf24 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -130,13 +130,16 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : destroyed (false), last_tsc (0), ticks (0), - rcvmore (false) + rcvmore (false), + monitor_socket (NULL), + monitor_events (0) { options.socket_id = sid_; } zmq::socket_base_t::~socket_base_t () { + stop_monitor (); zmq_assert (destroyed); } @@ -354,7 +357,7 @@ int zmq::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; - monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno()); + event_bind_failed (addr_, zmq_errno()); return -1; } @@ -373,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; - monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno()); + event_bind_failed (addr_, zmq_errno()); return -1; } @@ -845,6 +848,7 @@ void zmq::socket_base_t::process_stop () // We'll remember the fact so that any blocking call is interrupted and any // further attempt to use the socket will return ETERM. The user is still // responsible for calling zmq_close on the socket though! + stop_monitor (); ctx_terminated = true; } @@ -996,15 +1000,172 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) rcvmore = msg_->flags () & msg_t::more ? true : false; } -void zmq::socket_base_t::monitor_event (int event_, ...) +int zmq::socket_base_t::monitor (const char *addr_, int events_) { - va_list args; - va_start (args, event_); - va_monitor_event(event_, args); - va_end (args); + int rc; + if (unlikely (ctx_terminated)) { + errno = ETERM; + return -1; + } + + // Support deregistering monitoring endpoints as well + if (addr_ == NULL) { + stop_monitor (); + return 0; + } + + // Parse addr_ string. + std::string protocol; + std::string address; + rc = parse_uri (addr_, protocol, address); + if (rc != 0) + return -1; + + rc = check_protocol (protocol); + if (rc != 0) + return -1; + + // Event notification only supported over inproc:// + if (protocol != "inproc") { + errno = EPROTONOSUPPORT; + return -1; + } + + // Register events to monitor + monitor_events = events_; + + monitor_socket = zmq_socket( get_ctx (), ZMQ_PAIR); + if (monitor_socket == NULL) + return -1; + + // Never block context termination on pending event messages + int linger = 0; + rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger)); + if (rc == -1) + stop_monitor (); + + // Spawn the monitor socket endpoint + rc = zmq_bind (monitor_socket, addr_); + if (rc == -1) + stop_monitor (); + return rc; } -void zmq::socket_base_t::va_monitor_event (int event_, va_list args) +void zmq::socket_base_t::event_connected (const char *addr_, int fd_) { - get_ctx ()->va_monitor_event (this, event_, args); + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_CONNECTED)) return; + event.event = ZMQ_EVENT_CONNECTED; + event.data.connected.addr = (char *)addr_; + event.data.connected.fd = fd_; + monitor_event (event); } + +void zmq::socket_base_t::event_connect_delayed (const char *addr_, int err_) +{ + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_CONNECT_DELAYED)) return; + event.event = ZMQ_EVENT_CONNECT_DELAYED; + event.data.connected.addr = (char *)addr_; + event.data.connect_delayed.err = err_; + monitor_event (event); +} + +void zmq::socket_base_t::event_connect_retried (const char *addr_, int interval_) +{ + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_CONNECT_RETRIED)) return; + event.event = ZMQ_EVENT_CONNECT_RETRIED; + event.data.connected.addr = (char *)addr_; + event.data.connect_retried.interval = interval_; + monitor_event (event); +} + +void zmq::socket_base_t::event_listening (const char *addr_, int fd_) +{ + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_LISTENING)) return; + event.event = ZMQ_EVENT_LISTENING; + event.data.connected.addr = (char *)addr_; + event.data.listening.fd = fd_; + monitor_event (event); +} + +void zmq::socket_base_t::event_bind_failed (const char *addr_, int err_) +{ + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_BIND_FAILED)) return; + event.event = ZMQ_EVENT_BIND_FAILED; + event.data.connected.addr = (char *)addr_; + event.data.bind_failed.err = err_; + monitor_event (event); +} + +void zmq::socket_base_t::event_accepted (const char *addr_, int fd_) +{ + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_ACCEPTED)) return; + event.event = ZMQ_EVENT_ACCEPTED; + event.data.connected.addr = (char *)addr_; + event.data.accepted.fd = fd_; + monitor_event (event); +} + +void zmq::socket_base_t::event_accept_failed (const char *addr_, int err_) +{ + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_ACCEPT_FAILED)) return; + event.event = ZMQ_EVENT_ACCEPT_FAILED; + event.data.connected.addr = (char *)addr_; + event.data.accept_failed.err= err_; + monitor_event (event); +} + +void zmq::socket_base_t::event_closed (const char *addr_, int fd_) +{ + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_CLOSED)) return; + event.event = ZMQ_EVENT_CLOSED; + event.data.connected.addr = (char *)addr_; + event.data.closed.fd = fd_; + monitor_event (event); +} + +void zmq::socket_base_t::event_close_failed (const char *addr_, int err_) +{ + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_CLOSE_FAILED)) return; + event.event = ZMQ_EVENT_CLOSE_FAILED; + event.data.connected.addr = (char *)addr_; + event.data.close_failed.err = err_; + monitor_event (event); +} + +void zmq::socket_base_t::event_disconnected (const char *addr_, int fd_) +{ + zmq_event_t event; + if (!(monitor_events & ZMQ_EVENT_DISCONNECTED)) return; + event.event = ZMQ_EVENT_DISCONNECTED; + event.data.connected.addr = (char *)addr_; + event.data.disconnected.fd = fd_; + monitor_event (event); +} + +void zmq::socket_base_t::monitor_event (zmq_event_t event_) +{ + zmq_msg_t msg; + if (!monitor_socket) return; + zmq_msg_init_size (&msg, sizeof (event_)); + memcpy (zmq_msg_data (&msg), &event_, sizeof (event_)); + zmq_sendmsg (monitor_socket, &msg, 0); + zmq_msg_close (&msg); +} + +void zmq::socket_base_t::stop_monitor() +{ + if (monitor_socket) { + zmq_close (monitor_socket); + monitor_socket = NULL; + monitor_events = 0; + } +} \ No newline at end of file diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 2dabd1e0..fd159b53 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -102,8 +102,18 @@ namespace zmq void lock(); void unlock(); - void monitor_event (int event_, ...); - void va_monitor_event (int event_, va_list args); + int monitor(const char *endpoint_, int events_); + + void event_connected(const char *addr_, int fd_); + void event_connect_delayed(const char *addr_, int err_); + void event_connect_retried(const char *addr_, int interval_); + void event_listening(const char *addr_, int fd_); + void event_bind_failed(const char *addr_, int err_); + void event_accepted(const char *addr_, int fd_); + void event_accept_failed(const char *addr_, int err_); + void event_closed(const char *addr_, int fd_); + void event_close_failed(const char *addr_, int fd_); + void event_disconnected(const char *addr_, int fd_); protected: @@ -138,6 +148,12 @@ namespace zmq // Delay actual destruction of the socket. void process_destroy (); + // Socket event data dispath + void monitor_event (zmq_event_t data_); + + // Monitor socket cleanup + void stop_monitor (); + private: // Creates new endpoint ID and adds the endpoint to the map. void add_endpoint (const char *addr_, own_t *endpoint_); @@ -210,6 +226,12 @@ namespace zmq // Improves efficiency of time measurement. clock_t clock; + // Monitor socket; + void *monitor_socket; + + // Bitmask of events being monitored + int monitor_events; + socket_base_t (const socket_base_t&); const socket_base_t &operator = (const socket_base_t&); mutex_t sync; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 27925097..88c00cda 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -62,7 +62,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons session (NULL), options (options_), endpoint (endpoint_), - plugged (false) + plugged (false), + socket (NULL) { // Put the socket into non-blocking mode. unblock_socket (s); @@ -126,6 +127,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, zmq_assert (!session); zmq_assert (session_); session = session_; + socket = session-> get_socket (); // Connect to I/O threads poller object. io_object_t::plug (io_thread_); @@ -445,7 +447,7 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_) void zmq::stream_engine_t::error () { zmq_assert (session); - session->monitor_event (ZMQ_EVENT_DISCONNECTED, endpoint.c_str(), s); + socket->event_disconnected (endpoint.c_str(), s); session->detach (); unplug (); delete this; diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 1dc56756..f24c1fff 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -31,6 +31,7 @@ #include "i_encoder.hpp" #include "i_decoder.hpp" #include "options.hpp" +#include "socket_base.hpp" #include "../include/zmq.h" namespace zmq @@ -133,6 +134,9 @@ namespace zmq bool plugged; + // Socket + zmq::socket_base_t *socket; + stream_engine_t (const stream_engine_t&); const stream_engine_t &operator = (const stream_engine_t&); }; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index e3aac63e..9589a909 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -66,6 +66,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, zmq_assert (addr); zmq_assert (addr->protocol == "tcp"); addr->to_string (endpoint); + socket = session-> get_socket(); } zmq::tcp_connecter_t::~tcp_connecter_t () @@ -135,7 +136,7 @@ void zmq::tcp_connecter_t::out_event () // Shut the connecter down. terminate (); - session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd); + socket->event_connected (endpoint.c_str(), fd); } void zmq::tcp_connecter_t::timer_event (int id_) @@ -162,7 +163,7 @@ void zmq::tcp_connecter_t::start_connecting () handle = add_fd (s); handle_valid = true; set_pollout (handle); - session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno()); + socket->event_connect_delayed (endpoint.c_str(), zmq_errno()); } // Handle any other error condition by eventual reconnect. @@ -177,7 +178,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer() { int rc_ivl = get_new_reconnect_ivl(); add_timer (rc_ivl, reconnect_timer_id); - session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl); + socket->event_connect_retried (endpoint.c_str(), rc_ivl); timer_started = true; } @@ -303,6 +304,6 @@ void zmq::tcp_connecter_t::close () int rc = ::close (s); errno_assert (rc == 0); #endif - session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); + socket->event_closed (endpoint.c_str(), s); s = retired_fd; } diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index d5aed5f7..eedfdadc 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -111,6 +111,9 @@ namespace zmq // String representation of endpoint to connect to std::string endpoint; + // Socket + zmq::socket_base_t *socket; + tcp_connecter_t (const tcp_connecter_t&); const tcp_connecter_t &operator = (const tcp_connecter_t&); }; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index f35531e2..b75fa6a6 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -85,7 +85,7 @@ void zmq::tcp_listener_t::in_event () // If connection was reset by the peer in the meantime, just ignore it. // TODO: Handle specific errors like ENFILE/EMFILE etc. if (fd == retired_fd) { - socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno()); + socket->event_accept_failed (endpoint.c_str(), zmq_errno()); return; } @@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event () session->inc_seqnum (); launch_child (session); send_attach (session, engine, false); - socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd); + socket->event_accepted (endpoint.c_str(), fd); } void zmq::tcp_listener_t::close () @@ -121,7 +121,7 @@ void zmq::tcp_listener_t::close () int rc = ::close (s); errno_assert (rc == 0); #endif - socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); + socket->event_closed (endpoint.c_str(), s); s = retired_fd; } @@ -223,7 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) goto error; #endif - socket->monitor_event (ZMQ_EVENT_LISTENING, endpoint.c_str(), s); + socket->event_listening (endpoint.c_str(), s); return 0; error: diff --git a/src/zmq.cpp b/src/zmq.cpp index f5b0229c..fd8d5d8b 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -210,15 +210,6 @@ int zmq_ctx_get (void *ctx_, int option_) return ((zmq::ctx_t*) ctx_)->get (option_); } -int zmq_ctx_set_monitor (void *ctx_, zmq_monitor_fn *monitor_) -{ - if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { - errno = EFAULT; - return -1; - } - return ((zmq::ctx_t*) ctx_)->monitor (monitor_); -} - // Stable/legacy context API void *zmq_init (int io_threads_) @@ -284,6 +275,17 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) return result; } +int zmq_socket_monitor (void *s_, const char *addr_, int events_) +{ + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + int result = s->monitor (addr_, events_); + return result; +} + int zmq_bind (void *s_, const char *addr_) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index fb335b69..2f40fe44 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -21,89 +21,226 @@ #include "../include/zmq.h" #include "../include/zmq_utils.h" +#include #include #include "testutil.hpp" -static int events; +// REQ socket events handled +static int req_socket_events; +// 2nd REQ socket events handled +static int req2_socket_events; +// REP socket events handled +static int rep_socket_events; -typedef void *ZmqSocket; -ZmqSocket rep, req; +const char *addr; -void socket_monitor (ZmqSocket s, int event_, zmq_event_data_t *data_) +extern "C" { - assert(s == rep || s == req); + // REQ socket monitor thread + static void *req_socket_monitor (void *ctx) + { + zmq_event_t event; + int rc; - const char *addr = "tcp://127.0.0.1:5560"; - // Only some of the exceptional events could fire - switch (event_) { - // listener specific - case ZMQ_EVENT_LISTENING: - assert (s == rep); - assert (data_->listening.fd > 0); - assert (!strcmp (data_->listening.addr, addr)); - events |= ZMQ_EVENT_LISTENING; - break; - case ZMQ_EVENT_ACCEPTED: - assert (s == rep); - assert (data_->accepted.fd > 0); - assert (!strcmp (data_->accepted.addr, addr)); - events |= ZMQ_EVENT_ACCEPTED; - break; - // connecter specific - case ZMQ_EVENT_CONNECTED: - assert (s == req); - assert (data_->connected.fd > 0); - assert (!strcmp (data_->connected.addr, addr)); - events |= ZMQ_EVENT_CONNECTED; - break; - case ZMQ_EVENT_CONNECT_DELAYED: - assert (s == req); - assert (data_->connect_delayed.err != 0); - assert (!strcmp (data_->connect_delayed.addr, addr)); - events |= ZMQ_EVENT_CONNECT_DELAYED; - break; - // generic - either end of the socket - case ZMQ_EVENT_CLOSE_FAILED: - assert (data_->close_failed.err != 0); - assert (!strcmp (data_->close_failed.addr, addr)); - events |= ZMQ_EVENT_CLOSE_FAILED; - break; - case ZMQ_EVENT_CLOSED: - assert (data_->closed.fd != 0); - assert (!strcmp (data_->closed.addr, addr)); - events |= ZMQ_EVENT_CLOSED; - break; - case ZMQ_EVENT_DISCONNECTED: - assert (data_->disconnected.fd != 0); - assert (!strcmp (data_->disconnected.addr, addr)); - events |= ZMQ_EVENT_DISCONNECTED; - break; - default: - // out of band / unexpected event - assert (0); + void *s = zmq_socket (ctx, ZMQ_PAIR); + assert (s); + + rc = zmq_connect (s, "inproc://monitor.req"); + assert (rc == 0); + while (true) { + zmq_msg_t msg; + zmq_msg_init (&msg); + rc = zmq_recvmsg (s, &msg, 0); + if (rc == -1 && zmq_errno() == ETERM) break; + assert (rc != -1); + memcpy (&event, zmq_msg_data (&msg), sizeof (event)); + switch (event.event) { + case ZMQ_EVENT_CONNECTED: + assert (event.data.connected.fd > 0); + assert (!strcmp (event.data.connected.addr, addr)); + req_socket_events |= ZMQ_EVENT_CONNECTED; + req2_socket_events |= ZMQ_EVENT_CONNECTED; + break; + case ZMQ_EVENT_CONNECT_DELAYED: + assert (event.data.connect_delayed.err != 0); + assert (!strcmp (event.data.connect_delayed.addr, addr)); + req_socket_events |= ZMQ_EVENT_CONNECT_DELAYED; + break; + case ZMQ_EVENT_CLOSE_FAILED: + assert (event.data.close_failed.err != 0); + assert (!strcmp (event.data.close_failed.addr, addr)); + req_socket_events |= ZMQ_EVENT_CLOSE_FAILED; + break; + case ZMQ_EVENT_CLOSED: + assert (event.data.closed.fd != 0); + assert (!strcmp (event.data.closed.addr, addr)); + req_socket_events |= ZMQ_EVENT_CLOSED; + break; + case ZMQ_EVENT_DISCONNECTED: + assert (event.data.disconnected.fd != 0); + assert (!strcmp (event.data.disconnected.addr, addr)); + req_socket_events |= ZMQ_EVENT_DISCONNECTED; + break; + } + } + zmq_close (s); + return NULL; + } +} + +extern "C" +{ + // 2nd REQ socket monitor thread + static void *req2_socket_monitor (void *ctx) + { + zmq_event_t event; + int rc; + + void *s = zmq_socket (ctx, ZMQ_PAIR); + assert (s); + + rc = zmq_connect (s, "inproc://monitor.req2"); + assert (rc == 0); + while (true) { + zmq_msg_t msg; + zmq_msg_init (&msg); + rc = zmq_recvmsg (s, &msg, 0); + if (rc == -1 && zmq_errno() == ETERM) break; + assert (rc != -1); + memcpy (&event, zmq_msg_data (&msg), sizeof (event)); + switch (event.event) { + case ZMQ_EVENT_CONNECTED: + assert (event.data.connected.fd > 0); + assert (!strcmp (event.data.connected.addr, addr)); + req2_socket_events |= ZMQ_EVENT_CONNECTED; + break; + case ZMQ_EVENT_CLOSED: + assert (event.data.closed.fd != 0); + assert (!strcmp (event.data.closed.addr, addr)); + req2_socket_events |= ZMQ_EVENT_CLOSED; + break; + } + } + zmq_close (s); + return NULL; + } +} + + +extern "C" +{ + // REP socket monitor thread + static void *rep_socket_monitor (void *ctx) + { + zmq_event_t event; + int rc; + + void *s = zmq_socket (ctx, ZMQ_PAIR); + assert (s); + + rc = zmq_connect (s, "inproc://monitor.rep"); + assert (rc == 0); + while (true) { + zmq_msg_t msg; + zmq_msg_init (&msg); + rc = zmq_recvmsg (s, &msg, 0); + if (rc == -1 && zmq_errno() == ETERM) break; + assert (rc != -1); + memcpy (&event, zmq_msg_data (&msg), sizeof (event)); + switch (event.event) { + case ZMQ_EVENT_LISTENING: + assert (event.data.listening.fd > 0); + assert (!strcmp (event.data.listening.addr, addr)); + rep_socket_events |= ZMQ_EVENT_LISTENING; + break; + case ZMQ_EVENT_ACCEPTED: + assert (event.data.accepted.fd > 0); + assert (!strcmp (event.data.accepted.addr, addr)); + rep_socket_events |= ZMQ_EVENT_ACCEPTED; + break; + case ZMQ_EVENT_CLOSE_FAILED: + assert (event.data.close_failed.err != 0); + assert (!strcmp (event.data.close_failed.addr, addr)); + rep_socket_events |= ZMQ_EVENT_CLOSE_FAILED; + break; + case ZMQ_EVENT_CLOSED: + assert (event.data.closed.fd != 0); + assert (!strcmp (event.data.closed.addr, addr)); + rep_socket_events |= ZMQ_EVENT_CLOSED; + break; + case ZMQ_EVENT_DISCONNECTED: + assert (event.data.disconnected.fd != 0); + assert (!strcmp (event.data.disconnected.addr, addr)); + rep_socket_events |= ZMQ_EVENT_DISCONNECTED; + break; + } + zmq_msg_close (&msg); + } + zmq_close (s); + return NULL; } } int main (void) { int rc; + void *req; + void *req2; + void *rep; + pthread_t threads [3]; + + addr = "tcp://127.0.0.1:5560"; // Create the infrastructure void *ctx = zmq_init (1); assert (ctx); - // set socket monitor - rc = zmq_ctx_set_monitor (ctx, socket_monitor); - assert (rc == 0); + + // REP socket rep = zmq_socket (ctx, ZMQ_REP); assert (rep); - rc = zmq_bind (rep, "tcp://127.0.0.1:5560"); + // Assert supported protocols + rc = zmq_socket_monitor (rep, addr, 0); + assert (rc == -1); + assert (zmq_errno() == EPROTONOSUPPORT); + + // Deregister monitor + rc = zmq_socket_monitor (rep, NULL, 0); assert (rc == 0); + // REP socket monitor, all events + rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); + assert (rc == 0); + rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx); + assert (rc == 0); + + rc = zmq_bind (rep, addr); + assert (rc == 0); + + // REQ socket req = zmq_socket (ctx, ZMQ_REQ); assert (req); - rc = zmq_connect (req, "tcp://127.0.0.1:5560"); + // REQ socket monitor, all events + rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); + assert (rc == 0); + rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx); + assert (rc == 0); + + rc = zmq_connect (req, addr); + assert (rc == 0); + + // 2nd REQ socket + req2 = zmq_socket (ctx, ZMQ_REQ); + assert (req2); + + // 2nd REQ socket monitor, connected event only + rc = zmq_socket_monitor (req2, "inproc://monitor.req2", ZMQ_EVENT_CONNECTED); + assert (rc == 0); + rc = pthread_create (&threads [2], NULL, req2_socket_monitor, ctx); + assert (rc == 0); + + rc = zmq_connect (req2, addr); assert (rc == 0); bounce (rep, req); @@ -111,26 +248,41 @@ int main (void) // Allow a window for socket events as connect can be async zmq_sleep (1); - // Deallocate the infrastructure. + // Close the REP socket + rc = zmq_close (rep); + assert (rc == 0); + + // Allow some time for detecting error states + zmq_sleep (1); + + // Close the REQ socket rc = zmq_close (req); assert (rc == 0); + // Close the 2nd REQ socket + rc = zmq_close (req2); + assert (rc == 0); + // Allow for closed or disconnected events to bubble up zmq_sleep (1); - rc = zmq_close (rep); - assert (rc == 0); - - zmq_sleep (1); - zmq_term (ctx); - // We expect to at least observe these events - assert (events & ZMQ_EVENT_LISTENING); - assert (events & ZMQ_EVENT_ACCEPTED); - assert (events & ZMQ_EVENT_CONNECTED); - assert (events & ZMQ_EVENT_CLOSED); - assert (events & ZMQ_EVENT_DISCONNECTED); + // Expected REP socket events + assert (rep_socket_events & ZMQ_EVENT_LISTENING); + assert (rep_socket_events & ZMQ_EVENT_ACCEPTED); + assert (rep_socket_events & ZMQ_EVENT_CLOSED); + + // Expected REQ socket events + assert (req_socket_events & ZMQ_EVENT_CONNECTED); + assert (req_socket_events & ZMQ_EVENT_DISCONNECTED); + assert (req_socket_events & ZMQ_EVENT_CLOSED); + + // Expected 2nd REQ socket events + assert (req2_socket_events & ZMQ_EVENT_CONNECTED); + assert (!(req2_socket_events & ZMQ_EVENT_CLOSED)); + + pthread_exit (NULL); return 0 ; }