From 5c6f72c17c1139f6c9699f998c9ca6eedfc535af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Fri, 4 May 2012 02:32:46 +0100 Subject: [PATCH] ZMQ_MONITOR socket option registers a callback / event sink for changes in socket state --- .gitignore | 2 + AUTHORS | 1 + NEWS | 2 + doc/zmq_getsockopt.txt | 10 +++ doc/zmq_setsockopt.txt | 41 ++++++++++- include/zmq.h | 68 +++++++++++++++++- src/address.cpp | 2 +- src/address.hpp | 2 +- src/ipc_connecter.cpp | 14 +++- src/ipc_connecter.hpp | 3 + src/ipc_listener.cpp | 20 ++++-- src/ipc_listener.hpp | 3 + src/options.cpp | 23 +++++++ src/options.hpp | 4 ++ src/session_base.cpp | 15 ++++ src/session_base.hpp | 3 + src/socket_base.cpp | 58 ++++++++++++++++ src/socket_base.hpp | 3 + src/stream_engine.cpp | 4 ++ src/stream_engine.hpp | 4 ++ src/tcp_connecter.cpp | 14 +++- src/tcp_connecter.hpp | 4 ++ src/tcp_listener.cpp | 14 +++- src/tcp_listener.hpp | 4 ++ tests/Makefile.am | 4 +- tests/test_monitor.cpp | 152 +++++++++++++++++++++++++++++++++++++++++ 26 files changed, 461 insertions(+), 13 deletions(-) create mode 100644 tests/test_monitor.cpp diff --git a/.gitignore b/.gitignore index 71ebc06e..81f8ad1b 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,8 @@ autom4te.cache .* *~ .*~ +tests/test_term_endpoint +tests/test_monitor tests/test_last_endpoint tests/test_pair_inproc tests/test_pair_ipc diff --git a/AUTHORS b/AUTHORS index 99084a80..53982ad8 100644 --- a/AUTHORS +++ b/AUTHORS @@ -76,6 +76,7 @@ Thijs Terlouw Toralf Wittner Tore Halvorsen Vitaly Mayatskikh +Lourens Naudé Credits ======= diff --git a/NEWS b/NEWS index b17272f6..fd4af84e 100644 --- a/NEWS +++ b/NEWS @@ -57,6 +57,8 @@ Building New functionality ----------------- +* ZMQ_MONITOR socket option registers a callback / event sink for changes in socket state. + * POSIX-compliant zmq_send and zmq_recv introduced (uses raw buffer instead of message object). diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index df1f5f3f..3548c39c 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -455,6 +455,16 @@ Option value unit:: -1,>0 Default value:: -1 (leave to OS default) Applicable socket types:: all, when using TCP transports. +ZMQ_MONITOR: Registers a callback for socket state changes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Registers a callback function / event sink for changes in underlying socket state. +The default value of `NULL` means no monitor callback function. + +[horizontal] +Option value type:: zmq_monitor_fn +Option value unit:: N/A +Default value:: no callback function +Applicable socket types:: all RETURN VALUE ------------ diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index c0434faf..2bbb7590 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -431,12 +431,51 @@ Default value:: no filters (allow from all) Applicable socket types:: all listening sockets, when using TCP transports. +ZMQ_MONITOR: Registers a callback for socket state changes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Registers a callback function / event sink for changes in underlying socket state. +Expected signature is `void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data)` +To remove the callback function call `zmq_setsockopt(socket, ZMQ_MONITOR, NULL, 0)` +The default value of `NULL` means no monitor callback function. +Supported events are : + +* 'ZMQ_EVENT_CONNECTED' - connection established +* 'ZMQ_EVENT_CONNECT_DELAYED' - connection could not be established synchronously, it's being polled +* 'ZMQ_EVENT_CONNECT_RETRIED' - asynchronous connect / reconnection attempt + +* 'ZMQ_EVENT_LISTENING' - socket bound to an address, ready to accept connections +* 'ZMQ_EVENT_BIND_FAILED' - socket couldn't bind to an address + +* 'ZMQ_EVENT_ACCEPTED' - connection accepted to bound interface +* 'ZMQ_EVENT_ACCEPT_FAILED' - could not accept client connection + +* 'ZMQ_EVENT_CLOSED' - connection closed +* 'ZMQ_EVENT_CLOSE_FAILED' - connection couldn't be closed +* 'ZMQ_EVENT_DISCONNECTED' - broken session + +See `zmq_event_data_t` and `ZMQ_EVENT_*` constants in zmq.h for event specific data (third argument to callback). + +Please note that both events and their context data aren't stable contracts. The 'ZMQ_MONITOR' socket option is +intended for monitoring infrastructure / operations concerns only - NOT BUSINESS LOGIC. An event is a representation +of something that happened - you cannot change the past, but only react to them. The implementation also only concerned +with a single session. No state of peers, other sessions etc. are tracked - this will only pollute internals and is the +responsibility of application authors to either implement or correlate in another datastore. Monitor events are exceptional +conditions and are thus not directly in the messaging critical path. However, still be careful with what you're doing in the +callback function as severe latency there will block the socket's application thread. + +Only tcp and ipc specific transport events are supported in this initial implementation. + +[horizontal] +Option value type:: zmq_monitor_fn +Option value unit:: N/A +Default value:: no callback function +Applicable socket types:: all + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it shall return `-1` and set 'errno' to one of the values defined below. - ERRORS ------ *EINVAL*:: diff --git a/include/zmq.h b/include/zmq.h index 30b069dc..8d556057 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,7 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_ACCEPT_FILTER 38 - +#define ZMQ_MONITOR 39 /* Message options */ #define ZMQ_MORE 1 @@ -236,6 +236,72 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_DONTWAIT 1 #define ZMQ_SNDMORE 2 +/******************************************************************************/ +/* 0MQ socket events and monitoring */ +/******************************************************************************/ + +/* Socket transport events (tcp and ipc only) */ +#define ZMQ_EVENT_CONNECTED 1 +#define ZMQ_EVENT_CONNECT_DELAYED 2 +#define ZMQ_EVENT_CONNECT_RETRIED 3 + +#define ZMQ_EVENT_LISTENING 4 +#define ZMQ_EVENT_BIND_FAILED 5 + +#define ZMQ_EVENT_ACCEPTED 6 +#define ZMQ_EVENT_ACCEPT_FAILED 7 + +#define ZMQ_EVENT_CLOSED 8 +#define ZMQ_EVENT_CLOSE_FAILED 9 +#define ZMQ_EVENT_DISCONNECTED 10 + +/* Socket event data (union member per event) */ +typedef union { + struct { + char *addr; + int fd; + } connected; + struct { + char *addr; + int err; + } connect_delayed; + struct { + char *addr; + int interval; + } connect_retried; + struct { + char *addr; + int fd; + } listening; + struct { + char *addr; + int err; + } bind_failed; + struct { + char *addr; + int fd; + } accepted; + struct { + char *addr; + int err; + } accept_failed; + struct { + char *addr; + int fd; + } closed; + struct { + char *addr; + int err; + } close_failed; + struct { + char *addr; + int fd; + } disconnected; +} zmq_event_data_t; + +/* Callback template for socket state changes */ +typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data); + ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT int zmq_close (void *s); ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval, diff --git a/src/address.cpp b/src/address.cpp index f8f1a12e..6dc86dd1 100644 --- a/src/address.cpp +++ b/src/address.cpp @@ -52,7 +52,7 @@ zmq::address_t::~address_t () #endif } -int zmq::address_t::to_string (std::string &addr_) +int zmq::address_t::to_string (std::string &addr_) const { if (protocol == "tcp") { if (resolved.tcp_addr) { diff --git a/src/address.hpp b/src/address.hpp index 4e4bdd59..a8a4765d 100644 --- a/src/address.hpp +++ b/src/address.hpp @@ -45,7 +45,7 @@ namespace zmq #endif } resolved; - int to_string (std::string &addr_); + int to_string (std::string &addr_) const; }; } diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 2312866d..f860b0b3 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -33,6 +33,7 @@ #include "ip.hpp" #include "address.hpp" #include "ipc_address.hpp" +#include "session_base.hpp" #include #include @@ -53,6 +54,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, { zmq_assert (addr); zmq_assert (addr->protocol == "ipc"); + addr->to_string (endpoint); } zmq::ipc_connecter_t::~ipc_connecter_t () @@ -105,6 +107,8 @@ void zmq::ipc_connecter_t::out_event () // Shut the connecter down. terminate (); + + session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd); } void zmq::ipc_connecter_t::timer_event (int id_) @@ -132,6 +136,7 @@ void zmq::ipc_connecter_t::start_connecting () handle = add_fd (s); handle_valid = true; set_pollout (handle); + session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno()); return; } @@ -143,7 +148,9 @@ void zmq::ipc_connecter_t::start_connecting () void zmq::ipc_connecter_t::add_reconnect_timer() { - add_timer (get_new_reconnect_ivl(), reconnect_timer_id); + int rc_ivl = get_new_reconnect_ivl(); + add_timer (rc_ivl, reconnect_timer_id); + session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl); } int zmq::ipc_connecter_t::get_new_reconnect_ivl () @@ -195,8 +202,11 @@ int zmq::ipc_connecter_t::close () { zmq_assert (s != retired_fd); int rc = ::close (s); - if (rc != 0) + if (rc != 0) { + session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno()); return -1; + } + session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); s = retired_fd; return 0; } diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index e3ca6f8b..34df9776 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -106,6 +106,9 @@ namespace zmq // Current reconnect ivl, updated for backoff strategy int current_reconnect_ivl; + // String representation of endpoint to connect to + std::string endpoint; + ipc_connecter_t (const ipc_connecter_t&); const ipc_connecter_t &operator = (const ipc_connecter_t&); }; diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 310d6f30..bbf59a00 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -33,6 +33,7 @@ #include "config.hpp" #include "err.hpp" #include "ip.hpp" +#include "socket_base.hpp" #include #include @@ -75,8 +76,10 @@ void zmq::ipc_listener_t::in_event () // If connection was reset by the peer in the meantime, just ignore it. // TODO: Handle specific errors like ENFILE/EMFILE etc. - if (fd == retired_fd) + if (fd == retired_fd) { + socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno()); return; + } // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); @@ -94,6 +97,7 @@ void zmq::ipc_listener_t::in_event () session->inc_seqnum (); launch_child (session); send_attach (session, engine, false); + socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd); } int zmq::ipc_listener_t::get_address (std::string &addr_) @@ -133,6 +137,8 @@ int zmq::ipc_listener_t::set_address (const char *addr_) if (s == -1) return -1; + address.to_string (endpoint); + // Bind the socket to the file path. rc = bind (s, address.addr (), address.addrlen ()); if (rc != 0) @@ -146,6 +152,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_) if (rc != 0) return -1; + socket->monitor_event (ZMQ_EVENT_LISTENING, addr_, s); return 0; } @@ -153,18 +160,23 @@ int zmq::ipc_listener_t::close () { zmq_assert (s != retired_fd); int rc = ::close (s); - if (rc != 0) + if (rc != 0) { + socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno()); return -1; - s = retired_fd; + } // If there's an underlying UNIX domain socket, get rid of the file it // is associated with. if (has_file && !filename.empty ()) { rc = ::unlink(filename.c_str ()); - if (rc != 0) + if (rc != 0) { + socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno()); return -1; + } } + socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); + s = retired_fd; return 0; } diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index fb04606b..8930787e 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -84,6 +84,9 @@ namespace zmq // Socket the listerner belongs to. zmq::socket_base_t *socket; + // String representation of endpoint to bind to + std::string endpoint; + ipc_listener_t (const ipc_listener_t&); const ipc_listener_t &operator = (const ipc_listener_t&); }; diff --git a/src/options.cpp b/src/options.cpp index e9068988..8fb47a35 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -53,6 +53,7 @@ zmq::options_t::options_t () : tcp_keepalive_cnt (-1), tcp_keepalive_idle (-1), tcp_keepalive_intvl (-1), + monitor (NULL), socket_id (0) { } @@ -313,6 +314,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; } } + + case ZMQ_MONITOR: + { + if (optvallen_ == 0 && optval_ == NULL) { + monitor = NULL; + return 0; + } + if (optvallen_ != sizeof (void *)) { + errno = EINVAL; + return -1; + } + monitor = ((zmq_monitor_fn*) optval_); + return 0; + } } errno = EINVAL; return -1; @@ -530,6 +545,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = last_endpoint.size()+1; return 0; + case ZMQ_MONITOR: + if (*optvallen_ < sizeof (void *)) { + errno = EINVAL; + return -1; + } + ((zmq_monitor_fn*) optval_) = monitor; + *optvallen_ = sizeof (void *); + return 0; } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index abe558ec..c1248c76 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -29,6 +29,7 @@ #include "stddef.h" #include "stdint.hpp" #include "tcp_address.hpp" +#include "../include/zmq.h" namespace zmq { @@ -124,6 +125,9 @@ namespace zmq typedef std::vector tcp_accept_filters_t; tcp_accept_filters_t tcp_accept_filters; + // Connection and exceptional state callback function + zmq_monitor_fn *monitor; + // ID of the socket. int socket_id; }; diff --git a/src/session_base.cpp b/src/session_base.cpp index 6dc6799d..2ada9707 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -255,6 +255,21 @@ void zmq::session_base_t::hiccuped (pipe_t *pipe_) zmq_assert (false); } +int zmq::session_base_t::get_address (std::string &addr_) +{ + if (addr) + return addr->to_string (addr_); + return -1; +} + +void zmq::session_base_t::monitor_event (int event_, ...) +{ + va_list args; + va_start (args, event_); + socket->monitor_event (event_, args); + va_end (args); +} + void zmq::session_base_t::process_plug () { if (connect) diff --git a/src/session_base.hpp b/src/session_base.hpp index bc0ecd62..4a96ff2b 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -65,6 +65,9 @@ namespace zmq void hiccuped (zmq::pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_); + int get_address (std::string &addr_); + void monitor_event (int event_, ...); + protected: session_base_t (zmq::io_thread_t *io_thread_, bool connect_, diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b8796571..cccb7491 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -351,6 +351,7 @@ int zmq::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; + monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno()); return -1; } @@ -369,6 +370,7 @@ int zmq::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; + monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno()); return -1; } @@ -980,3 +982,59 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) // Remove MORE flag. rcvmore = msg_->flags () & msg_t::more ? true : false; } + +void zmq::socket_base_t::monitor_event (int event_, ...) +{ + if (options.monitor != NULL) { + va_list args; + zmq_event_data_t data; + memset(&data, 0, sizeof (zmq_event_data_t)); + va_start (args, event_); + switch (event_) { + case ZMQ_EVENT_CONNECTED: + data.connected.addr = va_arg (args, char*); + data.connected.fd = va_arg (args, int); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + data.connect_delayed.addr = va_arg (args, char*); + data.connect_delayed.err = va_arg (args, int); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + data.connect_retried.addr = va_arg (args, char*); + data.connect_retried.interval = va_arg (args, int); + break; + case ZMQ_EVENT_LISTENING: + data.listening.addr = va_arg (args, char*); + data.listening.fd = va_arg (args, int); + break; + case ZMQ_EVENT_BIND_FAILED: + data.bind_failed.addr = va_arg (args, char*); + data.bind_failed.err = va_arg (args, int); + break; + case ZMQ_EVENT_ACCEPTED: + data.accepted.addr = va_arg (args, char*); + data.accepted.fd = va_arg (args, int); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + data.accept_failed.addr = va_arg (args, char*); + data.accept_failed.err = va_arg (args, int); + break; + case ZMQ_EVENT_CLOSED: + data.closed.addr = va_arg (args, char*); + data.closed.fd = va_arg (args, int); + break; + case ZMQ_EVENT_CLOSE_FAILED: + data.close_failed.addr = va_arg (args, char*); + data.close_failed.err = va_arg (args, int); + break; + case ZMQ_EVENT_DISCONNECTED: + data.disconnected.addr = va_arg (args, char*); + data.disconnected.fd = va_arg (args, int); + break; + default: + zmq_assert (false); + } + options.monitor ((void *)this, event_, &data); + va_end (args); + } +} \ No newline at end of file diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1788cd49..a412f695 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -100,6 +100,9 @@ namespace zmq void terminated (pipe_t *pipe_); void lock(); void unlock(); + + void monitor_event (int event_, ...); + protected: socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 1771990b..bff16239 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -117,6 +117,8 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, decoder.set_session (session_); session = session_; + session->get_address (endpoint); + // Connect to I/O threads poller object. io_object_t::plug (io_thread_); handle = add_fd (s); @@ -143,6 +145,7 @@ void zmq::stream_engine_t::unplug () decoder.set_session (NULL); leftover_session = session; session = NULL; + endpoint.clear(); } void zmq::stream_engine_t::terminate () @@ -267,6 +270,7 @@ void zmq::stream_engine_t::activate_in () void zmq::stream_engine_t::error () { zmq_assert (session); + session->monitor_event (ZMQ_EVENT_DISCONNECTED, endpoint.c_str(), s); session->detach (); unplug (); delete this; diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 30b190b0..cf87f67f 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -30,6 +30,7 @@ #include "encoder.hpp" #include "decoder.hpp" #include "options.hpp" +#include "../include/zmq.h" namespace zmq { @@ -96,6 +97,9 @@ namespace zmq options_t options; + // String representation of endpoint + std::string endpoint; + bool plugged; stream_engine_t (const stream_engine_t&); diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index a91c4145..73a22ab0 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -31,6 +31,7 @@ #include "ip.hpp" #include "address.hpp" #include "tcp_address.hpp" +#include "session_base.hpp" #if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" @@ -62,6 +63,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, { zmq_assert (addr); zmq_assert (addr->protocol == "tcp"); + addr->to_string (endpoint); } zmq::tcp_connecter_t::~tcp_connecter_t () @@ -117,6 +119,8 @@ void zmq::tcp_connecter_t::out_event () // Shut the connecter down. terminate (); + + session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd); } void zmq::tcp_connecter_t::timer_event (int id_) @@ -144,6 +148,7 @@ void zmq::tcp_connecter_t::start_connecting () handle = add_fd (s); handle_valid = true; set_pollout (handle); + session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno()); return; } @@ -155,7 +160,9 @@ void zmq::tcp_connecter_t::start_connecting () void zmq::tcp_connecter_t::add_reconnect_timer() { - add_timer (get_new_reconnect_ivl(), reconnect_timer_id); + int rc_ivl = get_new_reconnect_ivl(); + add_timer (rc_ivl, reconnect_timer_id); + session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl); } int zmq::tcp_connecter_t::get_new_reconnect_ivl () @@ -277,10 +284,15 @@ void zmq::tcp_connecter_t::close () zmq_assert (s != retired_fd); #ifdef ZMQ_HAVE_WINDOWS int rc = closesocket (s); + if (unlikely (rc != SOCKET_ERROR)) + session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno()); wsa_assert (rc != SOCKET_ERROR); #else int rc = ::close (s); + if (unlikely (rc == 0)) + session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno()); errno_assert (rc == 0); #endif + session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); s = retired_fd; } diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 0891710c..a157dc55 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -26,6 +26,7 @@ #include "own.hpp" #include "stdint.hpp" #include "io_object.hpp" +#include "../include/zmq.h" namespace zmq { @@ -103,6 +104,9 @@ namespace zmq // Current reconnect ivl, updated for backoff strategy int current_reconnect_ivl; + // String representation of endpoint to connect to + std::string endpoint; + tcp_connecter_t (const tcp_connecter_t&); const tcp_connecter_t &operator = (const tcp_connecter_t&); }; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 67df66d7..162c0bfb 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -31,6 +31,7 @@ #include "config.hpp" #include "err.hpp" #include "ip.hpp" +#include "socket_base.hpp" #ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" @@ -83,8 +84,10 @@ void zmq::tcp_listener_t::in_event () // If connection was reset by the peer in the meantime, just ignore it. // TODO: Handle specific errors like ENFILE/EMFILE etc. - if (fd == retired_fd) + if (fd == retired_fd) { + socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno()); return; + } tune_tcp_socket (fd); tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); @@ -105,6 +108,7 @@ void zmq::tcp_listener_t::in_event () session->inc_seqnum (); launch_child (session); send_attach (session, engine, false); + socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd); } void zmq::tcp_listener_t::close () @@ -112,11 +116,16 @@ void zmq::tcp_listener_t::close () zmq_assert (s != retired_fd); #ifdef ZMQ_HAVE_WINDOWS int rc = closesocket (s); + if (unlikely (rc != SOCKET_ERROR)) + socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno()); wsa_assert (rc != SOCKET_ERROR); #else int rc = ::close (s); + if (unlikely (rc == 0)) + socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno()); errno_assert (rc == 0); #endif + socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); s = retired_fd; } @@ -185,6 +194,8 @@ int zmq::tcp_listener_t::set_address (const char *addr_) errno_assert (rc == 0); #endif + address.to_string (endpoint); + // Bind the socket to the network interface and port. rc = bind (s, address.addr (), address.addrlen ()); #ifdef ZMQ_HAVE_WINDOWS @@ -209,6 +220,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) return -1; #endif + socket->monitor_event (ZMQ_EVENT_LISTENING, addr_, s); return 0; } diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index c3339e49..cb1f8b83 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -27,6 +27,7 @@ #include "stdint.hpp" #include "io_object.hpp" #include "tcp_address.hpp" +#include "../include/zmq.h" namespace zmq { @@ -78,6 +79,9 @@ namespace zmq // Socket the listerner belongs to. zmq::socket_base_t *socket; + // String representation of endpoint to bind to + std::string endpoint; + tcp_listener_t (const tcp_listener_t&); const tcp_listener_t &operator = (const tcp_listener_t&); }; diff --git a/tests/Makefile.am b/tests/Makefile.am index 81bedeee..aa2c529a 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -14,7 +14,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_msg_flags \ test_connect_resolve \ test_last_endpoint \ - test_term_endpoint + test_term_endpoint \ + test_monitor if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -35,6 +36,7 @@ test_msg_flags_SOURCES = test_msg_flags.cpp test_connect_resolve_SOURCES = test_connect_resolve.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp +test_monitor_SOURCES = test_monitor.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp new file mode 100644 index 00000000..6822803d --- /dev/null +++ b/tests/test_monitor.cpp @@ -0,0 +1,152 @@ +/* + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include + +#include "../include/zmq.h" +#include "../include/zmq_utils.h" + +void listening_sock_monitor (void *s, int event_, zmq_event_data_t *data_) +{ + const char *addr = "tcp://127.0.0.1:5560"; + // Only some of the exceptional events could fire + switch (event_) { + case ZMQ_EVENT_LISTENING: + assert (data_->listening.fd > 0); + assert (memcmp (data_->listening.addr, addr, 22)); + break; + case ZMQ_EVENT_ACCEPTED: + assert (data_->accepted.fd > 0); + assert (memcmp (data_->accepted.addr, addr, 22)); + break; + case ZMQ_EVENT_CLOSE_FAILED: + assert (data_->close_failed.err != 0); + assert (memcmp (data_->close_failed.addr, addr, 22)); + break; + case ZMQ_EVENT_CLOSED: + assert (data_->closed.fd != 0); + assert (memcmp (data_->closed.addr, addr, 22)); + break; + case ZMQ_EVENT_DISCONNECTED: + assert (data_->disconnected.fd != 0); + assert (memcmp (data_->disconnected.addr, addr, 22)); + break; + default: + // out of band / unexpected event + assert (0); + } +} + +void connecting_sock_monitor (void *s, int event_, zmq_event_data_t *data_) +{ + const char *addr = "tcp://127.0.0.1:5560"; + // Only some of the exceptional events could fire + switch (event_) { + case ZMQ_EVENT_CONNECTED: + assert (data_->connected.fd > 0); + assert (memcmp (data_->connected.addr, addr, 22)); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + assert (data_->connect_delayed.err != 0); + assert (memcmp (data_->connect_delayed.addr, addr, 22)); + break; + case ZMQ_EVENT_CLOSE_FAILED: + assert (data_->close_failed.err != 0); + assert (memcmp (data_->close_failed.addr, addr, 22)); + break; + case ZMQ_EVENT_CLOSED: + assert (data_->closed.fd != 0); + assert (memcmp (data_->closed.addr, addr, 22)); + break; + case ZMQ_EVENT_DISCONNECTED: + assert (data_->disconnected.fd != 0); + assert (memcmp (data_->disconnected.addr, addr, 22)); + break; + default: + // out of band / unexpected event + assert (0); + } +} + +int main (int argc, char *argv []) +{ + int rc; + + // Create the infrastructure + void *ctx = zmq_init (1); + assert (ctx); + + void *rep = zmq_socket (ctx, ZMQ_REP); + assert (rep); + + // Expects failure - invalid size + rc = zmq_setsockopt (rep, ZMQ_MONITOR, (void *)listening_sock_monitor, 20); + assert (rc == -1); + assert (errno == EINVAL); + + rc = zmq_setsockopt (rep, ZMQ_MONITOR, (void *)listening_sock_monitor, sizeof (void *)); + assert (rc == 0); + + void *monitor; + size_t sz = sizeof (void *); + rc = zmq_getsockopt (rep, ZMQ_MONITOR, monitor, &sz); + assert (rc == 0); + assert (monitor == listening_sock_monitor); + + // Remove socket monitor callback + rc = zmq_setsockopt (rep, ZMQ_MONITOR, NULL, 0); + assert (rc == 0); + + rc = zmq_getsockopt (rep, ZMQ_MONITOR, monitor, &sz); + assert (rc == 0); + assert (monitor == listening_sock_monitor); + + rc = zmq_bind (rep, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + void *req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + + rc = zmq_setsockopt (req, ZMQ_MONITOR, (void *)connecting_sock_monitor, sizeof (void *)); + assert (rc == 0); + + rc = zmq_connect (req, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + // Allow a window for socket events as connect can be async + zmq_sleep (1); + + // Deallocate the infrastructure. + rc = zmq_close (req); + assert (rc == 0); + + // Allow for closed or disconnected events to bubble up + zmq_sleep (1); + + rc = zmq_close (rep); + assert (rc == 0); + + zmq_sleep (1); + + zmq_term (ctx); + return 0 ; +} \ No newline at end of file