diff --git a/CMakeLists.txt b/CMakeLists.txt index 51d9a21c..6f305c75 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -639,6 +639,7 @@ set(cxx-sources devpoll.cpp dgram.cpp dist.cpp + endpoint.cpp epoll.cpp err.cpp fq.cpp @@ -746,6 +747,7 @@ set(cxx-sources dish.hpp dist.hpp encoder.hpp + endpoint.hpp epoll.hpp err.hpp fd.hpp diff --git a/Makefile.am b/Makefile.am index 11433c46..be034c4c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -55,6 +55,8 @@ src_libzmq_la_SOURCES = \ src/dist.cpp \ src/dist.hpp \ src/encoder.hpp \ + src/endpoint.hpp \ + src/endpoint.cpp \ src/epoll.cpp \ src/epoll.hpp \ src/err.cpp \ diff --git a/include/zmq.h b/include/zmq.h index d243cfeb..b97260ef 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -723,6 +723,11 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket, const void *routing_id, size_t routing_id_size); +ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_, + const char *addr_, + uint64_t events_, + int event_version_); + #endif // ZMQ_BUILD_DRAFT_API diff --git a/src/endpoint.cpp b/src/endpoint.cpp new file mode 100644 index 00000000..c9ad6d66 --- /dev/null +++ b/src/endpoint.cpp @@ -0,0 +1,44 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "endpoint.hpp" + +zmq::endpoint_uri_pair_t +zmq::make_unconnected_connect_endpoint_pair (const std::string &endpoint_) +{ + return endpoint_uri_pair_t (std::string (), endpoint_, + endpoint_type_connect); +} + +zmq::endpoint_uri_pair_t +zmq::make_unconnected_bind_endpoint_pair (const std::string &endpoint_) +{ + return endpoint_uri_pair_t (endpoint_, std::string (), endpoint_type_bind); +} diff --git a/src/endpoint.hpp b/src/endpoint.hpp new file mode 100644 index 00000000..56b7d0e7 --- /dev/null +++ b/src/endpoint.hpp @@ -0,0 +1,72 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_ENDPOINT_HPP_INCLUDED__ +#define __ZMQ_ENDPOINT_HPP_INCLUDED__ + +#include + +namespace zmq +{ +enum endpoint_type_t +{ + endpoint_type_none, // a connection-less endpoint + endpoint_type_bind, // a connection-oriented bind endpoint + endpoint_type_connect // a connection-oriented connect endpoint +}; + +struct endpoint_uri_pair_t +{ + endpoint_uri_pair_t () : local_type (endpoint_type_none) {} + endpoint_uri_pair_t (const std::string &local, + const std::string &remote, + endpoint_type_t local_type) : + local (local), + remote (remote), + local_type (local_type) + { + } + + const std::string &identifier () const + { + return local_type == endpoint_type_bind ? local : remote; + } + + std::string local, remote; + endpoint_type_t local_type; +}; + +endpoint_uri_pair_t +make_unconnected_connect_endpoint_pair (const std::string &endpoint_); + +endpoint_uri_pair_t +make_unconnected_bind_endpoint_pair (const std::string &endpoint_); +} + +#endif diff --git a/src/i_engine.hpp b/src/i_engine.hpp index dfbdd265..de4f689c 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -30,6 +30,8 @@ #ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ #define __ZMQ_I_ENGINE_HPP_INCLUDED__ +#include "endpoint.hpp" + namespace zmq { class io_thread_t; @@ -61,7 +63,7 @@ struct i_engine virtual void zap_msg_available () = 0; - virtual const char *get_endpoint () const = 0; + virtual const endpoint_uri_pair_t &get_endpoint () const = 0; }; } diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 8d3804f2..46900df5 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -92,7 +92,8 @@ void zmq::ipc_connecter_t::start_connecting () else if (rc == -1 && errno == EINPROGRESS) { _handle = add_fd (_s); set_pollout (_handle); - _socket->event_connect_delayed (_endpoint, zmq_errno ()); + _socket->event_connect_delayed ( + make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ()); // TODO, tcp_connecter_t adds a connect timer in this case; maybe this // should be done here as well (and then this could be pulled up to diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 7630b8be..702c1f64 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -142,7 +142,8 @@ void zmq::ipc_listener_t::in_event () // If connection was reset by the peer in the meantime, just ignore it. // TODO: Handle specific errors like ENFILE/EMFILE etc. if (fd == retired_fd) { - _socket->event_accept_failed (_endpoint, zmq_errno ()); + _socket->event_accept_failed ( + make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ()); return; } @@ -224,7 +225,8 @@ int zmq::ipc_listener_t::set_address (const char *addr_) _filename = ZMQ_MOVE (addr); _has_file = true; - _socket->event_listening (_endpoint, _s); + _socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint), + _s); return 0; error: @@ -252,12 +254,14 @@ int zmq::ipc_listener_t::close () } if (rc != 0) { - _socket->event_close_failed (_endpoint, zmq_errno ()); + _socket->event_close_failed ( + make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ()); return -1; } } - _socket->event_closed (_endpoint, fd_for_event); + _socket->event_closed (make_unconnected_bind_endpoint_pair (_endpoint), + fd_for_event); return 0; } diff --git a/src/norm_engine.cpp b/src/norm_engine.cpp index 724feb7d..0b467763 100644 --- a/src/norm_engine.cpp +++ b/src/norm_engine.cpp @@ -709,9 +709,9 @@ zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem () return nextItem; } // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem() -const char *zmq::norm_engine_t::get_endpoint () const +const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const { - return ""; + return _empty_endpoint; } #endif // ZMQ_HAVE_NORM diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp index 733f24fc..f1f3a74f 100644 --- a/src/norm_engine.hpp +++ b/src/norm_engine.hpp @@ -47,7 +47,7 @@ class norm_engine_t : public io_object_t, public i_engine virtual void zap_msg_available (){}; - virtual const char *get_endpoint () const; + virtual const endpoint_uri_pair_t &get_endpoint () const; // i_poll_events interface implementation. // (we only need in_event() for NormEvent notification) @@ -150,6 +150,8 @@ class norm_engine_t : public io_object_t, public i_engine }; // end class zmq::norm_engine_t::NormRxStreamState + const endpoint_uri_pair_t _empty_endpoint; + session_base_t *zmq_session; options_t options; NormInstanceHandle norm_instance; diff --git a/src/options.cpp b/src/options.cpp index aad90cff..d5f18018 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -244,7 +244,8 @@ zmq::options_t::options_t () : loopback_fastpath (false), multicast_loop (true), zero_copy (true), - router_notify (0) + router_notify (0), + monitor_event_version (1) { memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_secret_key, 0, CURVE_KEYSIZE); diff --git a/src/options.hpp b/src/options.hpp index b591e961..aeaa86da 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -266,6 +266,9 @@ struct options_t // Application metadata std::map app_metadata; + + // Version of monitor events to emit + int monitor_event_version; }; inline bool get_effective_conflate_option (const options_t &options) diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 88105cb9..227efa06 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -156,9 +156,9 @@ bool zmq::pgm_receiver_t::restart_input () return true; } -const char *zmq::pgm_receiver_t::get_endpoint () const +const zmq::endpoint_uri_pair_t &zmq::pgm_receiver_t::get_endpoint () const { - return ""; + return _empty_endpoint; } void zmq::pgm_receiver_t::in_event () diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index d5eca74d..a28ec484 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -60,7 +60,7 @@ class pgm_receiver_t : public io_object_t, public i_engine bool restart_input (); void restart_output (); void zap_msg_available () {} - const char *get_endpoint () const; + const endpoint_uri_pair_t &get_endpoint () const; // i_poll_events interface implementation. void in_event (); @@ -84,6 +84,8 @@ class pgm_receiver_t : public io_object_t, public i_engine rx_timer_id = 0xa1 }; + const endpoint_uri_pair_t _empty_endpoint; + // RX timer is running. bool has_rx_timer; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 52561d0f..3c68f23b 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -143,9 +143,9 @@ bool zmq::pgm_sender_t::restart_input () return true; } -const char *zmq::pgm_sender_t::get_endpoint () const +const zmq::endpoint_uri_pair_t &zmq::pgm_sender_t::get_endpoint () const { - return ""; + return _empty_endpoint; } zmq::pgm_sender_t::~pgm_sender_t () diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 1eb0b1f1..671b3f3d 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -59,7 +59,7 @@ class pgm_sender_t : public io_object_t, public i_engine bool restart_input (); void restart_output (); void zap_msg_available () {} - const char *get_endpoint () const; + const endpoint_uri_pair_t &get_endpoint () const; // i_poll_events interface implementation. void in_event (); @@ -77,6 +77,8 @@ class pgm_sender_t : public io_object_t, public i_engine rx_timer_id = 0xa1 }; + const endpoint_uri_pair_t _empty_endpoint; + // Timers are running. bool has_tx_timer; bool has_rx_timer; diff --git a/src/pipe.cpp b/src/pipe.cpp index c8ce4cda..855ba0af 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -554,12 +554,12 @@ void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_) send_pipe_hwm (_peer, inhwm_, outhwm_); } -void zmq::pipe_t::set_endpoint_uri (const char *name_) +void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_) { - _endpoint_uri = name_; + _endpoint_pair = endpoint_pair_; } -std::string &zmq::pipe_t::get_endpoint_uri () +const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const { - return _endpoint_uri; + return _endpoint_pair; } diff --git a/src/pipe.hpp b/src/pipe.hpp index 49e800d3..76f71758 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -37,6 +37,7 @@ #include "array.hpp" #include "blob.hpp" #include "options.hpp" +#include "endpoint.hpp" namespace zmq { @@ -141,8 +142,8 @@ class pipe_t : public object_t, // Returns true if HWM is not reached bool check_hwm () const; - void set_endpoint_uri (const char *name_); - std::string &get_endpoint_uri (); + void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_); + const endpoint_uri_pair_t &get_endpoint_pair () const; private: // Type of the underlying lock-free pipe. @@ -247,9 +248,8 @@ class pipe_t : public object_t, const bool _conflate; - // If the pipe belongs to socket's endpoint the endpoint's name is stored here. - // Otherwise this is empty. - std::string _endpoint_uri; + // The endpoints of this pipe. + endpoint_uri_pair_t _endpoint_pair; // Disable copying. pipe_t (const pipe_t &); diff --git a/src/session_base.cpp b/src/session_base.cpp index ef7daf95..2de67eca 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -117,7 +117,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, { } -const char *zmq::session_base_t::get_endpoint () const +const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const { return _engine->get_endpoint (); } diff --git a/src/session_base.hpp b/src/session_base.hpp index 002cdf87..c72161ce 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -92,7 +92,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events int write_zap_msg (msg_t *msg_); socket_base_t *get_socket (); - const char *get_endpoint () const; + const endpoint_uri_pair_t &get_endpoint () const; protected: session_base_t (zmq::io_thread_t *io_thread_, diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1b53c904..bb5458d7 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include "macros.hpp" @@ -591,7 +592,10 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_) // Save last endpoint URI paddr->to_string (_last_endpoint); - add_endpoint (endpoint_uri_, static_cast (session), newpipe); + // TODO shouldn't this use _last_endpoint instead of endpoint_uri_? as in the other cases + add_endpoint (endpoint_uri_pair_t (endpoint_uri_, std::string (), + endpoint_type_none), + static_cast (session), newpipe); return 0; } @@ -611,15 +615,16 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_) rc = listener->set_address (address.c_str ()); if (rc != 0) { LIBZMQ_DELETE (listener); - event_bind_failed (address, zmq_errno ()); + event_bind_failed (make_unconnected_bind_endpoint_pair (address), + zmq_errno ()); return -1; } // Save last endpoint URI listener->get_address (_last_endpoint); - add_endpoint (_last_endpoint.c_str (), static_cast (listener), - NULL); + add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint), + static_cast (listener), NULL); options.connected = true; return 0; } @@ -633,15 +638,16 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { LIBZMQ_DELETE (listener); - event_bind_failed (address, zmq_errno ()); + event_bind_failed (make_unconnected_bind_endpoint_pair (address), + zmq_errno ()); return -1; } // Save last endpoint URI listener->get_address (_last_endpoint); - add_endpoint (_last_endpoint.c_str (), static_cast (listener), - NULL); + add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint), + static_cast (listener), NULL); options.connected = true; return 0; } @@ -654,14 +660,17 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { LIBZMQ_DELETE (listener); - event_bind_failed (address, zmq_errno ()); + event_bind_failed (make_unconnected_bind_endpoint_pair (address), + zmq_errno ()); return -1; } // Save last endpoint URI listener->get_address (_last_endpoint); - add_endpoint (endpoint_uri_, static_cast (listener), NULL); + // TODO shouldn't this use _last_endpoint as in the other cases? + add_endpoint (make_unconnected_bind_endpoint_pair (endpoint_uri_), + static_cast (listener), NULL); options.connected = true; return 0; } @@ -970,48 +979,49 @@ int zmq::socket_base_t::connect (const char *endpoint_uri_) // Save last endpoint URI paddr->to_string (_last_endpoint); - add_endpoint (endpoint_uri_, static_cast (session), newpipe); + add_endpoint (make_unconnected_connect_endpoint_pair (_last_endpoint), + static_cast (session), newpipe); return 0; } -std::string zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_, - const char *tcp_address_) +std::string +zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_pair_, + const char *tcp_address_) { // The resolved last_endpoint is used as a key in the endpoints map. // The address passed by the user might not match in the TCP case due to // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to // resolve before giving up. Given at this stage we don't know whether a // socket is connected or bound, try with both. - if (_endpoints.find (endpoint_uri_) == _endpoints.end ()) { + if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) { tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t (); alloc_assert (tcp_addr); int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6); if (rc == 0) { - tcp_addr->to_string (endpoint_uri_); - if (_endpoints.find (endpoint_uri_) == _endpoints.end ()) { + tcp_addr->to_string (endpoint_uri_pair_); + if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) { rc = tcp_addr->resolve (tcp_address_, true, options.ipv6); if (rc == 0) { - tcp_addr->to_string (endpoint_uri_); + tcp_addr->to_string (endpoint_uri_pair_); } } } LIBZMQ_DELETE (tcp_addr); } - return endpoint_uri_; + return endpoint_uri_pair_; } -void zmq::socket_base_t::add_endpoint (const char *endpoint_uri_, - own_t *endpoint_, - pipe_t *pipe_) +void zmq::socket_base_t::add_endpoint ( + const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_) { // Activate the session. Make it a child of this socket. launch_child (endpoint_); - _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), + _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (), endpoint_pipe_t (endpoint_, pipe_)); if (pipe_ != NULL) - pipe_->set_endpoint_uri (endpoint_uri_); + pipe_->set_endpoint_pair (endpoint_pair_); } int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_) @@ -1059,7 +1069,7 @@ int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_) ? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ()) : endpoint_uri_str; - // Find the endpoints range (if any) corresponding to the endpoint_uri_ string. + // Find the endpoints range (if any) corresponding to the endpoint_uri_pair_ string. const std::pair range = _endpoints.equal_range (resolved_endpoint_uri); if (range.first == range.second) { @@ -1560,9 +1570,10 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_) _pipes.erase (pipe_); // Remove the pipe from _endpoints (set it to NULL). - if (!pipe_->get_endpoint_uri ().empty ()) { + const std::string &identifier = pipe_->get_endpoint_pair ().identifier (); + if (!identifier.empty ()) { std::pair range; - range = _endpoints.equal_range (pipe_->get_endpoint_uri ()); + range = _endpoints.equal_range (identifier); for (endpoints_t::iterator it = range.first; it != range.second; ++it) { if (it->second.second == pipe_) { @@ -1586,7 +1597,9 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) _rcvmore = (msg_->flags () & msg_t::more) != 0; } -int zmq::socket_base_t::monitor (const char *endpoint_, int events_) +int zmq::socket_base_t::monitor (const char *endpoint_, + uint64_t events_, + int event_version_) { scoped_lock_t lock (_monitor_sync); @@ -1595,6 +1608,12 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_) return -1; } + // Event version 1 supports only first 16 events. + if (unlikely (event_version_ == 1 && events_ >> 16 != 0)) { + errno = EINVAL; + return -1; + } + // Support deregistering monitoring endpoints as well if (endpoint_ == NULL) { stop_monitor (); @@ -1617,6 +1636,7 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_) } // Register events to monitor _monitor_events = events_; + options.monitor_event_version = event_version_; _monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR); if (_monitor_socket == NULL) return -1; @@ -1635,137 +1655,174 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_) return rc; } -void zmq::socket_base_t::event_connected (const std::string &endpoint_uri_, - zmq::fd_t fd_) +void zmq::socket_base_t::event_connected ( + const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_, fd_, ZMQ_EVENT_CONNECTED); + event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CONNECTED); } void zmq::socket_base_t::event_connect_delayed ( - const std::string &endpoint_uri_, int err_) + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_, err_, ZMQ_EVENT_CONNECT_DELAYED); + event (endpoint_uri_pair_, err_, ZMQ_EVENT_CONNECT_DELAYED); } void zmq::socket_base_t::event_connect_retried ( - const std::string &endpoint_uri_, int interval_) + const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_) { - event (endpoint_uri_, interval_, ZMQ_EVENT_CONNECT_RETRIED); + event (endpoint_uri_pair_, interval_, ZMQ_EVENT_CONNECT_RETRIED); } -void zmq::socket_base_t::event_listening (const std::string &endpoint_uri_, - zmq::fd_t fd_) +void zmq::socket_base_t::event_listening ( + const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_, fd_, ZMQ_EVENT_LISTENING); + event (endpoint_uri_pair_, fd_, ZMQ_EVENT_LISTENING); } -void zmq::socket_base_t::event_bind_failed (const std::string &endpoint_uri_, - int err_) +void zmq::socket_base_t::event_bind_failed ( + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_, err_, ZMQ_EVENT_BIND_FAILED); + event (endpoint_uri_pair_, err_, ZMQ_EVENT_BIND_FAILED); } -void zmq::socket_base_t::event_accepted (const std::string &endpoint_uri_, - zmq::fd_t fd_) +void zmq::socket_base_t::event_accepted ( + const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_, fd_, ZMQ_EVENT_ACCEPTED); + event (endpoint_uri_pair_, fd_, ZMQ_EVENT_ACCEPTED); } -void zmq::socket_base_t::event_accept_failed (const std::string &endpoint_uri_, - int err_) +void zmq::socket_base_t::event_accept_failed ( + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_, err_, ZMQ_EVENT_ACCEPT_FAILED); + event (endpoint_uri_pair_, err_, ZMQ_EVENT_ACCEPT_FAILED); } -void zmq::socket_base_t::event_closed (const std::string &endpoint_uri_, - zmq::fd_t fd_) +void zmq::socket_base_t::event_closed ( + const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_, fd_, ZMQ_EVENT_CLOSED); + event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CLOSED); } -void zmq::socket_base_t::event_close_failed (const std::string &endpoint_uri_, - int err_) +void zmq::socket_base_t::event_close_failed ( + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_, err_, ZMQ_EVENT_CLOSE_FAILED); + event (endpoint_uri_pair_, err_, ZMQ_EVENT_CLOSE_FAILED); } -void zmq::socket_base_t::event_disconnected (const std::string &endpoint_uri_, - zmq::fd_t fd_) +void zmq::socket_base_t::event_disconnected ( + const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_, fd_, ZMQ_EVENT_DISCONNECTED); + event (endpoint_uri_pair_, fd_, ZMQ_EVENT_DISCONNECTED); } void zmq::socket_base_t::event_handshake_failed_no_detail ( - const std::string &endpoint_uri_, int err_) + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL); + event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL); } void zmq::socket_base_t::event_handshake_failed_protocol ( - const std::string &endpoint_uri_, int err_) + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL); + event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL); } void zmq::socket_base_t::event_handshake_failed_auth ( - const std::string &endpoint_uri_, int err_) + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH); + event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH); } void zmq::socket_base_t::event_handshake_succeeded ( - const std::string &endpoint_uri_, int err_) + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED); + event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED); } -void zmq::socket_base_t::event (const std::string &endpoint_uri_, - intptr_t value_, - int type_) +void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_, + uint64_t value_, + uint64_t type_) { scoped_lock_t lock (_monitor_sync); if (_monitor_events & type_) { - monitor_event (type_, value_, endpoint_uri_); + monitor_event (type_, value_, endpoint_uri_pair_); } } // Send a monitor event -void zmq::socket_base_t::monitor_event (int event_, - intptr_t value_, - const std::string &endpoint_uri_) const +void zmq::socket_base_t::monitor_event ( + uint64_t event_, + uint64_t value_, + const endpoint_uri_pair_t &endpoint_uri_pair_) const { // this is a private method which is only called from - // contexts where the mutex has been locked before + // contexts where the _monitor_sync mutex has been locked before if (_monitor_socket) { - // Send event in first frame - const uint16_t event = static_cast (event_); - const uint32_t value = static_cast (value_); zmq_msg_t msg; - zmq_msg_init_size (&msg, sizeof (event) + sizeof (value)); - uint8_t *data = static_cast (zmq_msg_data (&msg)); - // Avoid dereferencing uint32_t on unaligned address - memcpy (data + 0, &event, sizeof (event)); - memcpy (data + sizeof (event), &value, sizeof (value)); - zmq_sendmsg (_monitor_socket, &msg, ZMQ_SNDMORE); - // Send address in second frame - zmq_msg_init_size (&msg, endpoint_uri_.size ()); - memcpy (zmq_msg_data (&msg), endpoint_uri_.c_str (), - endpoint_uri_.size ()); - zmq_sendmsg (_monitor_socket, &msg, 0); + switch (options.monitor_event_version) { + case 1: { + // The API should not allow to activate unsupported events + zmq_assert (event_ <= std::numeric_limits::max ()); + zmq_assert (value_ <= std::numeric_limits::max ()); + + // Send event and value in first frame + const uint16_t event = static_cast (event_); + const uint32_t value = static_cast (value_); + zmq_msg_init_size (&msg, sizeof (event) + sizeof (value)); + uint8_t *data = static_cast (zmq_msg_data (&msg)); + // Avoid dereferencing uint32_t on unaligned address + memcpy (data + 0, &event, sizeof (event)); + memcpy (data + sizeof (event), &value, sizeof (value)); + zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); + + const std::string &endpoint_uri = + endpoint_uri_pair_.identifier (); + + // Send address in second frame + zmq_msg_init_size (&msg, endpoint_uri.size ()); + memcpy (zmq_msg_data (&msg), endpoint_uri.c_str (), + endpoint_uri.size ()); + zmq_msg_send (&msg, _monitor_socket, 0); + } break; + case 2: { + // Send event in first frame (64bit unsigned) + zmq_msg_init_size (&msg, sizeof event_); + memcpy (zmq_msg_data (&msg), &event_, sizeof event_); + zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); + + // Send value in second frame (64bit unsigned) + zmq_msg_init_size (&msg, sizeof value_); + memcpy (zmq_msg_data (&msg), &value_, sizeof value_); + zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); + + // Send local endpoint URI in third frame (string) + zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ()); + memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (), + endpoint_uri_pair_.local.size ()); + zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); + + // Send remote endpoint URI in fourth frame (string) + zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ()); + memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (), + endpoint_uri_pair_.remote.size ()); + zmq_msg_send (&msg, _monitor_socket, 0); + } break; + } } } void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_) { // this is a private method which is only called from - // contexts where the mutex has been locked before + // contexts where the _monitor_sync mutex has been locked before if (_monitor_socket) { if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_) - monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, ""); + monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, + endpoint_uri_pair_t ()); zmq_close (_monitor_socket); _monitor_socket = NULL; _monitor_events = 0; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index f888a022..0751f92e 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -43,6 +43,7 @@ #include "i_mailbox.hpp" #include "clock.hpp" #include "pipe.hpp" +#include "endpoint.hpp" extern "C" { void zmq_free_event (void *data_, void *hint_); @@ -118,26 +119,38 @@ class socket_base_t : public own_t, void lock (); void unlock (); - int monitor (const char *endpoint_, int events_); + int monitor (const char *endpoint_, uint64_t events_, int event_version_); - void event_connected (const std::string &endpoint_uri_, zmq::fd_t fd_); - void event_connect_delayed (const std::string &endpoint_uri_, int err_); - void event_connect_retried (const std::string &endpoint_uri_, + void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_, + zmq::fd_t fd_); + void event_connect_delayed (const endpoint_uri_pair_t &endpoint_uri_pair_, + int err_); + void event_connect_retried (const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_); - void event_listening (const std::string &endpoint_uri_, zmq::fd_t fd_); - void event_bind_failed (const std::string &endpoint_uri_, int err_); - void event_accepted (const std::string &endpoint_uri_, zmq::fd_t fd_); - void event_accept_failed (const std::string &endpoint_uri_, int err_); - void event_closed (const std::string &endpoint_uri_, zmq::fd_t fd_); - void event_close_failed (const std::string &endpoint_uri_, int err_); - void event_disconnected (const std::string &endpoint_uri_, zmq::fd_t fd_); - void event_handshake_failed_no_detail (const std::string &endpoint_uri_, - int err_); - void event_handshake_failed_protocol (const std::string &endpoint_uri_, - int err_); - void event_handshake_failed_auth (const std::string &endpoint_uri_, - int err_); - void event_handshake_succeeded (const std::string &endpoint_uri_, int err_); + void event_listening (const endpoint_uri_pair_t &endpoint_uri_pair_, + zmq::fd_t fd_); + void event_bind_failed (const endpoint_uri_pair_t &endpoint_uri_pair_, + int err_); + void event_accepted (const endpoint_uri_pair_t &endpoint_uri_pair_, + zmq::fd_t fd_); + void event_accept_failed (const endpoint_uri_pair_t &endpoint_uri_pair_, + int err_); + void event_closed (const endpoint_uri_pair_t &endpoint_uri_pair_, + zmq::fd_t fd_); + void event_close_failed (const endpoint_uri_pair_t &endpoint_uri_pair_, + int err_); + void event_disconnected (const endpoint_uri_pair_t &endpoint_uri_pair_, + zmq::fd_t fd_); + void event_handshake_failed_no_detail ( + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_); + void event_handshake_failed_protocol ( + const endpoint_uri_pair_t &endpoint_uri_pair_, int err_); + void + event_handshake_failed_auth (const endpoint_uri_pair_t &endpoint_uri_pair_, + int err_); + void + event_handshake_succeeded (const endpoint_uri_pair_t &endpoint_uri_pair_, + int err_); // Query the state of a specific peer. The default implementation // always returns an ENOTSUP error. @@ -186,19 +199,22 @@ class socket_base_t : public own_t, private: // test if event should be sent and then dispatch it - void event (const std::string &endpoint_uri_, intptr_t value_, int type_); + void event (const endpoint_uri_pair_t &endpoint_uri_pair_, + uint64_t value_, + uint64_t type_); // Socket event data dispatch - void monitor_event (int event_, - intptr_t value_, - const std::string &endpoint_uri_) const; + void monitor_event (uint64_t event_, + uint64_t value_, + const endpoint_uri_pair_t &endpoint_uri_pair_) const; // Monitor socket cleanup void stop_monitor (bool send_monitor_stopped_event_ = true); // Creates new endpoint ID and adds the endpoint to the map. - void - add_endpoint (const char *endpoint_uri_, own_t *endpoint_, pipe_t *pipe_); + void add_endpoint (const endpoint_uri_pair_t &endpoint_pair_, + own_t *endpoint_, + pipe_t *pipe_); // Map of open endpoints. typedef std::pair endpoint_pipe_t; @@ -295,7 +311,7 @@ class socket_base_t : public own_t, void *_monitor_socket; // Bitmask of events being monitored - int _monitor_events; + int64_t _monitor_events; // Last socket endpoint resolved URI std::string _last_endpoint; diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp index ada20726..f7aa507b 100644 --- a/src/socks_connecter.cpp +++ b/src/socks_connecter.cpp @@ -150,15 +150,19 @@ void zmq::socks_connecter_t::in_event () if (rc == -1) error (); else { + // TODO query the remote endpoint and pass it here + endpoint_uri_pair_t endpoint_pair = + make_unconnected_connect_endpoint_pair (_endpoint); + // Create the engine object for this connection. - stream_engine_t *engine = - new (std::nothrow) stream_engine_t (_s, options, _endpoint); + stream_engine_t *engine = new (std::nothrow) + stream_engine_t (_s, options, endpoint_pair); alloc_assert (engine); // Attach the engine to the corresponding session object. send_attach (_session, engine); - _socket->event_connected (_endpoint, _s); + _socket->event_connected (endpoint_pair, _s); rm_fd (_handle); _s = -1; @@ -225,7 +229,8 @@ void zmq::socks_connecter_t::initiate_connect () _handle = add_fd (_s); set_pollout (_handle); _status = waiting_for_proxy_connection; - _socket->event_connect_delayed (_endpoint, zmq_errno ()); + _socket->event_connect_delayed ( + make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ()); } // Handle any other error condition by eventual reconnect. else { @@ -272,7 +277,8 @@ void zmq::socks_connecter_t::start_timer () const int interval = get_new_reconnect_ivl (); add_timer (interval, reconnect_timer_id); _status = waiting_for_reconnect_time; - _socket->event_connect_retried (_endpoint, interval); + _socket->event_connect_retried ( + make_unconnected_connect_endpoint_pair (_endpoint), interval); } } @@ -443,7 +449,8 @@ void zmq::socks_connecter_t::close () const int rc = ::close (_s); errno_assert (rc == 0); #endif - _socket->event_closed (_endpoint, _s); + _socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint), + _s); _s = retired_fd; } diff --git a/src/stream_connecter_base.cpp b/src/stream_connecter_base.cpp index acb7ace2..8e4d3cf8 100644 --- a/src/stream_connecter_base.cpp +++ b/src/stream_connecter_base.cpp @@ -94,7 +94,8 @@ void zmq::stream_connecter_base_t::add_reconnect_timer () if (options.reconnect_ivl != -1) { const int interval = get_new_reconnect_ivl (); add_timer (interval, reconnect_timer_id); - _socket->event_connect_retried (_endpoint, interval); + _socket->event_connect_retried ( + make_unconnected_connect_endpoint_pair (_endpoint), interval); _reconnect_timer_started = true; } } @@ -131,7 +132,8 @@ void zmq::stream_connecter_base_t::close () const int rc = ::close (_s); errno_assert (rc == 0); #endif - _socket->event_closed (_endpoint, _s); + _socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint), + _s); _s = retired_fd; } @@ -145,9 +147,12 @@ void zmq::stream_connecter_base_t::in_event () void zmq::stream_connecter_base_t::create_engine (fd_t fd) { + const endpoint_uri_pair_t endpoint_pair ("TODO query local endpoint", + _endpoint, endpoint_type_connect); + // Create the engine object for this connection. stream_engine_t *engine = - new (std::nothrow) stream_engine_t (fd, options, _endpoint); + new (std::nothrow) stream_engine_t (fd, options, endpoint_pair); alloc_assert (engine); // Attach the engine to the corresponding session object. @@ -156,7 +161,7 @@ void zmq::stream_connecter_base_t::create_engine (fd_t fd) // Shut the connecter down. terminate (); - _socket->event_connected (_endpoint, fd); + _socket->event_connected (endpoint_pair, fd); } void zmq::stream_connecter_base_t::timer_event (int id_) diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 689070ca..9043d805 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -100,9 +100,10 @@ static std::string get_peer_address (zmq::fd_t s_) } -zmq::stream_engine_t::stream_engine_t (fd_t fd_, - const options_t &options_, - const std::string &endpoint_) : +zmq::stream_engine_t::stream_engine_t ( + fd_t fd_, + const options_t &options_, + const endpoint_uri_pair_t &endpoint_uri_pair_) : _s (fd_), _handle (static_cast (NULL)), _inpos (NULL), @@ -117,7 +118,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, _greeting_bytes_read (0), _session (NULL), _options (options_), - _endpoint (endpoint_), + _endpoint_uri_pair (endpoint_uri_pair_), _plugged (false), _next_msg (&stream_engine_t::routing_id_msg), _process_msg (&stream_engine_t::process_routing_id_msg), @@ -894,9 +895,9 @@ void zmq::stream_engine_t::zap_msg_available () restart_output (); } -const char *zmq::stream_engine_t::get_endpoint () const +const zmq::endpoint_uri_pair_t &zmq::stream_engine_t::get_endpoint () const { - return _endpoint.c_str (); + return _endpoint_uri_pair; } void zmq::stream_engine_t::mechanism_ready () @@ -960,7 +961,7 @@ void zmq::stream_engine_t::mechanism_ready () alloc_assert (_metadata); } - _socket->event_handshake_succeeded (_endpoint, 0); + _socket->event_handshake_succeeded (_endpoint_uri_pair, 0); } int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_) @@ -1081,10 +1082,10 @@ void zmq::stream_engine_t::error (error_reason_t reason_) && (_mechanism == NULL || _mechanism->status () == mechanism_t::handshaking)) { int err = errno; - _socket->event_handshake_failed_no_detail (_endpoint, err); + _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err); } - _socket->event_disconnected (_endpoint, _s); + _socket->event_disconnected (_endpoint_uri_pair, _s); _session->flush (); _session->engine_error (reason_); unplug (); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 1faea837..391ba8d0 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -70,7 +70,7 @@ class stream_engine_t : public io_object_t, public i_engine stream_engine_t (fd_t fd_, const options_t &options_, - const std::string &endpoint_); + const endpoint_uri_pair_t &endpoint_uri_pair_); ~stream_engine_t (); // i_engine interface implementation. @@ -79,7 +79,7 @@ class stream_engine_t : public io_object_t, public i_engine bool restart_input (); void restart_output (); void zap_msg_available (); - const char *get_endpoint () const; + const endpoint_uri_pair_t &get_endpoint () const; // i_poll_events interface implementation. void in_event (); @@ -190,8 +190,8 @@ class stream_engine_t : public io_object_t, public i_engine const options_t _options; - // String representation of endpoint - const std::string _endpoint; + // Representation of the connected endpoints. + const endpoint_uri_pair_t _endpoint_uri_pair; bool _plugged; diff --git a/src/stream_listener_base.cpp b/src/stream_listener_base.cpp index 96abbc59..878f38da 100644 --- a/src/stream_listener_base.cpp +++ b/src/stream_listener_base.cpp @@ -96,7 +96,7 @@ int zmq::stream_listener_base_t::close () const int rc = ::close (_s); errno_assert (rc == 0); #endif - _socket->event_closed (_endpoint, _s); + _socket->event_closed (make_unconnected_bind_endpoint_pair (_endpoint), _s); _s = retired_fd; return 0; @@ -104,8 +104,11 @@ int zmq::stream_listener_base_t::close () void zmq::stream_listener_base_t::create_engine (fd_t fd) { + const endpoint_uri_pair_t endpoint_pair (_endpoint, get_socket_name (fd), + endpoint_type_bind); + stream_engine_t *engine = - new (std::nothrow) stream_engine_t (fd, options, _endpoint); + new (std::nothrow) stream_engine_t (fd, options, endpoint_pair); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already @@ -120,5 +123,6 @@ void zmq::stream_listener_base_t::create_engine (fd_t fd) session->inc_seqnum (); launch_child (session); send_attach (session, engine, false); - _socket->event_accepted (_endpoint, fd); + + _socket->event_accepted (endpoint_pair, fd); } diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index d823a201..67069a99 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -140,7 +140,8 @@ void zmq::tcp_connecter_t::start_connecting () else if (rc == -1 && errno == EINPROGRESS) { _handle = add_fd (_s); set_pollout (_handle); - _socket->event_connect_delayed (_endpoint, zmq_errno ()); + _socket->event_connect_delayed ( + make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ()); // add userspace connect timeout add_connect_timer (); diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 565d07a7..f4daf6ea 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -72,7 +72,8 @@ void zmq::tcp_listener_t::in_event () // If connection was reset by the peer in the meantime, just ignore it. // TODO: Handle specific errors like ENFILE/EMFILE etc. if (fd == retired_fd) { - _socket->event_accept_failed (_endpoint, zmq_errno ()); + _socket->event_accept_failed ( + make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ()); return; } @@ -83,7 +84,8 @@ void zmq::tcp_listener_t::in_event () options.tcp_keepalive_idle, options.tcp_keepalive_intvl); rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt); if (rc != 0) { - _socket->event_accept_failed (_endpoint, zmq_errno ()); + _socket->event_accept_failed ( + make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ()); return; } @@ -107,7 +109,8 @@ int zmq::tcp_listener_t::set_address (const char *addr_) if (options.use_fd != -1) { _s = options.use_fd; - _socket->event_listening (_endpoint, _s); + _socket->event_listening ( + make_unconnected_bind_endpoint_pair (_endpoint), _s); return 0; } @@ -194,7 +197,8 @@ int zmq::tcp_listener_t::set_address (const char *addr_) goto error; #endif - _socket->event_listening (_endpoint, _s); + _socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint), + _s); return 0; error: diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp index 399fbf5c..418f7a80 100644 --- a/src/tipc_connecter.cpp +++ b/src/tipc_connecter.cpp @@ -94,7 +94,8 @@ void zmq::tipc_connecter_t::start_connecting () else if (rc == -1 && errno == EINPROGRESS) { _handle = add_fd (_s); set_pollout (_handle); - _socket->event_connect_delayed (_endpoint, zmq_errno ()); + _socket->event_connect_delayed ( + make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ()); } // Handle any other error condition by eventual reconnect. diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp index a7c21895..c550ae0e 100644 --- a/src/tipc_listener.cpp +++ b/src/tipc_listener.cpp @@ -68,7 +68,8 @@ void zmq::tipc_listener_t::in_event () // If connection was reset by the peer in the meantime, just ignore it. // TODO: Handle specific errors like ENFILE/EMFILE etc. if (fd == retired_fd) { - _socket->event_accept_failed (_endpoint, zmq_errno ()); + _socket->event_accept_failed ( + make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ()); return; } @@ -129,7 +130,8 @@ int zmq::tipc_listener_t::set_address (const char *addr_) if (rc != 0) goto error; - _socket->event_listening (_endpoint, _s); + _socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint), + _s); return 0; error: diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index 0d5453f7..3bd23f5f 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -444,9 +444,9 @@ void zmq::udp_engine_t::out_event () reset_pollout (_handle); } -const char *zmq::udp_engine_t::get_endpoint () const +const zmq::endpoint_uri_pair_t &zmq::udp_engine_t::get_endpoint () const { - return ""; + return _empty_endpoint; } void zmq::udp_engine_t::restart_output () diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp index 9bf5a533..198b87ad 100644 --- a/src/udp_engine.hpp +++ b/src/udp_engine.hpp @@ -43,12 +43,14 @@ class udp_engine_t : public io_object_t, public i_engine void in_event (); void out_event (); - const char *get_endpoint () const; + const endpoint_uri_pair_t &get_endpoint () const; private: int resolve_raw_address (char *addr_, size_t length_); void sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_); + const endpoint_uri_pair_t _empty_endpoint; + bool _plugged; fd_t _fd; diff --git a/src/zmq.cpp b/src/zmq.cpp index 1c9ea1cd..5e32dfd0 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -267,12 +267,20 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) return s->getsockopt (option_, optval_, optvallen_); } -int zmq_socket_monitor (void *s_, const char *addr_, int events_) +int zmq_socket_monitor_versioned (void *s_, + const char *addr_, + uint64_t events_, + int event_version_) { zmq::socket_base_t *s = as_socket_base_t (s_); if (!s) return -1; - return s->monitor (addr_, events_); + return s->monitor (addr_, events_, event_version_); +} + +int zmq_socket_monitor (void *s_, const char *addr_, int events_) +{ + return zmq_socket_monitor_versioned (s_, addr_, events_, 1); } int zmq_join (void *s_, const char *group_) diff --git a/src/zmq_draft.h b/src/zmq_draft.h index be1165aa..d31f37da 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -123,6 +123,11 @@ int zmq_socket_get_peer_state (void *socket_, const void *routing_id_, size_t routing_id_size_); +int zmq_socket_monitor_versioned (void *s_, + const char *addr_, + uint64_t events_, + int event_version_); + #endif // ZMQ_BUILD_DRAFT_API #endif //ifndef __ZMQ_DRAFT_H_INCLUDED__