diff --git a/src/mechanism.cpp b/src/mechanism.cpp index 42d2b67b..6c6f265b 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -63,7 +63,7 @@ void zmq::mechanism_t::peer_routing_id (msg_t *msg_) void zmq::mechanism_t::set_user_id (const void *data_, size_t size_) { _user_id.set (static_cast (data_), size_); - zap_properties.ZMQ_MAP_INSERT_OR_EMPLACE ( + _zap_properties.ZMQ_MAP_INSERT_OR_EMPLACE ( std::string (ZMQ_MSG_PROPERTY_USER_ID), std::string (reinterpret_cast (data_), size_)); } @@ -268,7 +268,7 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_, if (rc == -1) return -1; } - (zap_flag_ ? zap_properties : zmtp_properties) + (zap_flag_ ? _zap_properties : _zmtp_properties) .ZMQ_MAP_INSERT_OR_EMPLACE ( name, std::string (reinterpret_cast (value), value_length)); diff --git a/src/mechanism.hpp b/src/mechanism.hpp index b96a9d8e..edd58a8f 100644 --- a/src/mechanism.hpp +++ b/src/mechanism.hpp @@ -81,9 +81,12 @@ class mechanism_t const blob_t &get_user_id () const; - const metadata_t::dict_t &get_zmtp_properties () { return zmtp_properties; } + const metadata_t::dict_t &get_zmtp_properties () + { + return _zmtp_properties; + } - const metadata_t::dict_t &get_zap_properties () { return zap_properties; } + const metadata_t::dict_t &get_zap_properties () { return _zap_properties; } protected: // Only used to identify the socket for the Socket-Type @@ -123,15 +126,15 @@ class mechanism_t virtual int property (const std::string &name_, const void *value_, size_t length_); - // Properties received from ZMTP peer. - metadata_t::dict_t zmtp_properties; - - // Properties received from ZAP server. - metadata_t::dict_t zap_properties; - const options_t options; private: + // Properties received from ZMTP peer. + metadata_t::dict_t _zmtp_properties; + + // Properties received from ZAP server. + metadata_t::dict_t _zap_properties; + blob_t _routing_id; blob_t _user_id; diff --git a/src/router.cpp b/src/router.cpp index 918679aa..fe02f74b 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -37,7 +37,7 @@ #include "err.hpp" zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_), + routing_socket_base_t (parent_, tid_, sid_), _prefetched (false), _routing_id_sent (false), _current_in (NULL), @@ -63,8 +63,6 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::router_t::~router_t () { zmq_assert (_anonymous_pipes.empty ()); - ; - zmq_assert (_out_pipes.empty ()); _prefetched_id.close (); _prefetched_msg.close (); } @@ -99,21 +97,12 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { - bool is_int = (optvallen_ == sizeof (int)); + const bool is_int = (optvallen_ == sizeof (int)); int value = 0; if (is_int) memcpy (&value, optval_, sizeof (int)); switch (option_) { - case ZMQ_CONNECT_ROUTING_ID: - // TODO why isn't it possible to set an empty connect_routing_id - // (which is the default value) - if (optval_ && optvallen_) { - connect_routing_id.assign ((char *) optval_, optvallen_); - return 0; - } - break; - case ZMQ_ROUTER_RAW: if (is_int && value >= 0) { _raw_socket = (value != 0); @@ -147,7 +136,8 @@ int zmq::router_t::xsetsockopt (int option_, break; default: - break; + return routing_socket_base_t::xsetsockopt (option_, optval_, + optvallen_); } errno = EINVAL; return -1; @@ -160,9 +150,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_) if (it != _anonymous_pipes.end ()) _anonymous_pipes.erase (it); else { - outpipes_t::iterator iter = _out_pipes.find (pipe_->get_routing_id ()); - zmq_assert (iter != _out_pipes.end ()); - _out_pipes.erase (iter); + erase_out_pipe (pipe_); _fq.pipe_terminated (pipe_); pipe_->rollback (); if (pipe_ == _current_out) @@ -184,18 +172,6 @@ void zmq::router_t::xread_activated (pipe_t *pipe_) } } -void zmq::router_t::xwrite_activated (pipe_t *pipe_) -{ - outpipes_t::iterator it; - for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) - if (it->second.pipe == pipe_) - break; - - zmq_assert (it != _out_pipes.end ()); - zmq_assert (!it->second.active); - it->second.active = true; -} - int zmq::router_t::xsend (msg_t *msg_) { // If this is the first part of the message it's the ID of the @@ -211,19 +187,19 @@ int zmq::router_t::xsend (msg_t *msg_) // Find the pipe associated with the routing id stored in the prefix. // If there's no such pipe just silently ignore the message, unless - // router_mandatory is set. - blob_t routing_id (static_cast (msg_->data ()), - msg_->size (), zmq::reference_tag_t ()); - outpipes_t::iterator it = _out_pipes.find (routing_id); + // router_mandatory is set. ; + out_pipe_t *out_pipe = lookup_out_pipe ( + blob_t (static_cast (msg_->data ()), + msg_->size (), zmq::reference_tag_t ())); - if (it != _out_pipes.end ()) { - _current_out = it->second.pipe; + if (out_pipe) { + _current_out = out_pipe->pipe; // Check whether pipe is closed or not if (!_current_out->check_write ()) { // Check whether pipe is full or not bool pipe_full = !_current_out->check_hwm (); - it->second.active = false; + out_pipe->active = false; _current_out = NULL; if (_mandatory) { @@ -420,6 +396,11 @@ bool zmq::router_t::xhas_in () return true; } +static bool check_pipe_hwm (const zmq::pipe_t &pipe) +{ + return pipe.check_hwm (); +} + bool zmq::router_t::xhas_out () { // In theory, ROUTER socket is always ready for writing (except when @@ -429,12 +410,7 @@ bool zmq::router_t::xhas_out () if (!_mandatory) return true; - bool has_out = false; - outpipes_t::iterator it; - for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) - has_out |= it->second.pipe->check_hwm (); - - return has_out; + return any_of_out_pipes (check_pipe_hwm); } const zmq::blob_t &zmq::router_t::get_credential () const @@ -448,14 +424,13 @@ int zmq::router_t::get_peer_state (const void *routing_id_, int res = 0; blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_); - outpipes_t::const_iterator it = _out_pipes.find (routing_id_blob); - if (it == _out_pipes.end ()) { + const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob); + if (!out_pipe) { errno = EHOSTUNREACH; return -1; } - const out_pipe_t &outpipe = it->second; - if (outpipe.pipe->check_hwm ()) + if (out_pipe->pipe->check_hwm ()) res |= ZMQ_POLLOUT; /** \todo does it make any sense to check the inpipe as well? */ @@ -466,16 +441,15 @@ int zmq::router_t::get_peer_state (const void *routing_id_, bool zmq::router_t::identify_peer (pipe_t *pipe_) { msg_t msg; - bool ok; blob_t routing_id; - if (connect_routing_id.length ()) { - routing_id.set ((unsigned char *) connect_routing_id.c_str (), - connect_routing_id.length ()); - connect_routing_id.clear (); - outpipes_t::iterator it = _out_pipes.find (routing_id); - if (it != _out_pipes.end ()) - zmq_assert (false); // Not allowed to duplicate an existing rid + const std::string connect_routing_id = extract_connect_routing_id (); + if (!connect_routing_id.empty ()) { + routing_id.set ( + reinterpret_cast (connect_routing_id.c_str ()), + connect_routing_id.length ()); + // Not allowed to duplicate an existing rid + zmq_assert (!has_out_pipe (routing_id)); } else if ( options .raw_socket) { // Always assign an integral routing id for raw-socket @@ -486,7 +460,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) } else if (!options.raw_socket) { // Pick up handshake cases and also case where next integral routing id is set msg.init (); - ok = pipe_->read (&msg); + bool ok = pipe_->read (&msg); if (!ok) return false; @@ -500,10 +474,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) } else { routing_id.set (static_cast (msg.data ()), msg.size ()); - outpipes_t::iterator it = _out_pipes.find (routing_id); msg.close (); - if (it != _out_pipes.end ()) { + // Try to remove an existing routing id entry to allow the new + // connection to take the routing id. + out_pipe_t existing_outpipe = try_erase_out_pipe (routing_id); + + if (existing_outpipe.pipe) { if (!_handover) // Ignore peers with duplicate ID return false; @@ -516,19 +493,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) put_uint32 (buf + 1, _next_integral_routing_id++); blob_t new_routing_id (buf, sizeof buf); - it->second.pipe->set_router_socket_routing_id (new_routing_id); - out_pipe_t existing_outpipe = {it->second.pipe, - it->second.active}; + existing_outpipe.pipe->set_router_socket_routing_id ( + new_routing_id); - ok = _out_pipes - .ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (new_routing_id), - existing_outpipe) - .second; - zmq_assert (ok); - - // Remove the existing routing id entry to allow the new - // connection to take the routing id. - _out_pipes.erase (it); + add_out_pipe (ZMQ_MOVE (new_routing_id), existing_outpipe.pipe); if (existing_outpipe.pipe == _current_in) _terminate_current_in = true; @@ -539,11 +507,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) } pipe_->set_router_socket_routing_id (routing_id); - // Add the record into output pipes lookup table - out_pipe_t outpipe = {pipe_, true}; - ok = _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) - .second; - zmq_assert (ok); + add_out_pipe (ZMQ_MOVE (routing_id), pipe_); return true; } diff --git a/src/router.hpp b/src/router.hpp index 56157d24..370f4dcc 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -45,7 +45,7 @@ class ctx_t; class pipe_t; // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. -class router_t : public socket_base_t +class router_t : public routing_socket_base_t { public: router_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); @@ -59,7 +59,6 @@ class router_t : public socket_base_t bool xhas_in (); bool xhas_out (); void xread_activated (zmq::pipe_t *pipe_); - void xwrite_activated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_); int get_peer_state (const void *identity_, size_t identity_size_) const; @@ -97,19 +96,9 @@ class router_t : public socket_base_t // If true, more incoming message parts are expected. bool _more_in; - struct out_pipe_t - { - zmq::pipe_t *pipe; - bool active; - }; - // We keep a set of pipes that have not been identified yet. std::set _anonymous_pipes; - // Outbound pipes indexed by the peer IDs. - typedef std::map outpipes_t; - outpipes_t _out_pipes; - // The pipe we are currently writing to. zmq::pipe_t *_current_out; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 5e951973..f9bd64ad 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1762,3 +1762,102 @@ void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_) _monitor_events = 0; } } + +zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_, + uint32_t tid_, + int sid_) : + socket_base_t (parent_, tid_, sid_) +{ +} + +zmq::routing_socket_base_t::~routing_socket_base_t () +{ + zmq_assert (_out_pipes.empty ()); +} + +int zmq::routing_socket_base_t::xsetsockopt (int option_, + const void *optval_, + size_t optvallen_) +{ + switch (option_) { + case ZMQ_CONNECT_ROUTING_ID: + // TODO why isn't it possible to set an empty connect_routing_id + // (which is the default value) + if (optval_ && optvallen_) { + _connect_routing_id.assign (static_cast (optval_), + optvallen_); + return 0; + } + break; + } + errno = EINVAL; + return -1; +} + +void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_) +{ + out_pipes_t::iterator it; + for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) + if (it->second.pipe == pipe_) + break; + + zmq_assert (it != _out_pipes.end ()); + zmq_assert (!it->second.active); + it->second.active = true; +} + +std::string zmq::routing_socket_base_t::extract_connect_routing_id () +{ + std::string res = ZMQ_MOVE (_connect_routing_id); + _connect_routing_id.clear (); + return res; +} + +void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id, pipe_t *pipe_) +{ + // Add the record into output pipes lookup table + const out_pipe_t outpipe = {pipe_, true}; + const bool ok = + _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) + .second; + zmq_assert (ok); +} + +bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id) const +{ + return 0 != _out_pipes.count (routing_id); +} + +zmq::routing_socket_base_t::out_pipe_t * +zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id) +{ + // TODO we could probably avoid constructor a temporary blob_t to call this function + out_pipes_t::iterator it = _out_pipes.find (routing_id); + return it == _out_pipes.end () ? NULL : &it->second; +} + +const zmq::routing_socket_base_t::out_pipe_t * +zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id) const +{ + // TODO we could probably avoid constructor a temporary blob_t to call this function + out_pipes_t::const_iterator it = _out_pipes.find (routing_id); + return it == _out_pipes.end () ? NULL : &it->second; +} + +void zmq::routing_socket_base_t::erase_out_pipe (pipe_t *pipe_) +{ + const size_t erased = _out_pipes.erase (pipe_->get_routing_id ()); + zmq_assert (erased); +} + +zmq::routing_socket_base_t::out_pipe_t +zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id) +{ + const out_pipes_t::iterator it = _out_pipes.find (routing_id); + out_pipe_t res = {NULL, false}; + if (it != _out_pipes.end ()) { + res = it->second; + _out_pipes.erase (it); + } + return res; +} diff --git a/src/socket_base.hpp b/src/socket_base.hpp index dbc6f85d..924b8594 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -184,9 +184,6 @@ class socket_base_t : public own_t, // Delay actual destruction of the socket. void process_destroy (); - // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types - std::string connect_routing_id; - private: // test if event should be sent and then dispatch it void event (const std::string &addr_, intptr_t fd_, int type_); @@ -300,6 +297,53 @@ class socket_base_t : public own_t, socket_base_t (const socket_base_t &); const socket_base_t &operator= (const socket_base_t &); }; + +class routing_socket_base_t : public socket_base_t +{ + protected: + routing_socket_base_t (class ctx_t *parent_, uint32_t tid_, int sid_); + ~routing_socket_base_t (); + + // methods from socket_base_t + virtual int + xsetsockopt (int option_, const void *optval_, size_t optvallen_); + virtual void xwrite_activated (pipe_t *pipe_); + + // own methods + std::string extract_connect_routing_id (); + + struct out_pipe_t + { + pipe_t *pipe; + bool active; + }; + + void add_out_pipe (blob_t routing_id, pipe_t *pipe_); + bool has_out_pipe (const blob_t &routing_id) const; + out_pipe_t *lookup_out_pipe (const blob_t &routing_id); + const out_pipe_t *lookup_out_pipe (const blob_t &routing_id) const; + void erase_out_pipe (pipe_t *pipe_); + out_pipe_t try_erase_out_pipe (const blob_t &routing_id); + template bool any_of_out_pipes (Func func) + { + bool res = false; + for (out_pipes_t::iterator it = _out_pipes.begin (); + it != _out_pipes.end (); ++it) { + if (res |= func (*it->second.pipe)) + break; + } + + return res; + } + + private: + // Outbound pipes indexed by the peer IDs. + typedef std::map out_pipes_t; + out_pipes_t _out_pipes; + + // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types + std::string _connect_routing_id; +}; } #endif diff --git a/src/stream.cpp b/src/stream.cpp index c0f31605..2e888642 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -37,7 +37,7 @@ #include "err.hpp" zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_), + routing_socket_base_t (parent_, tid_, sid_), _prefetched (false), _routing_id_sent (false), _current_out (NULL), @@ -53,7 +53,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::stream_t::~stream_t () { - zmq_assert (_outpipes.empty ()); _prefetched_routing_id.close (); _prefetched_msg.close (); } @@ -70,10 +69,10 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::stream_t::xpipe_terminated (pipe_t *pipe_) { - outpipes_t::iterator it = _outpipes.find (pipe_->get_routing_id ()); - zmq_assert (it != _outpipes.end ()); - _outpipes.erase (it); + erase_out_pipe (pipe_); _fq.pipe_terminated (pipe_); + // TODO router_t calls pipe_->rollback() here; should this be done here as + // well? then xpipe_terminated could be pulled up to routing_socket_base_t if (pipe_ == _current_out) _current_out = NULL; } @@ -83,18 +82,6 @@ void zmq::stream_t::xread_activated (pipe_t *pipe_) _fq.activated (pipe_); } -void zmq::stream_t::xwrite_activated (pipe_t *pipe_) -{ - outpipes_t::iterator it; - for (it = _outpipes.begin (); it != _outpipes.end (); ++it) - if (it->second.pipe == pipe_) - break; - - zmq_assert (it != _outpipes.end ()); - zmq_assert (!it->second.active); - it->second.active = true; -} - int zmq::stream_t::xsend (msg_t *msg_) { // If this is the first part of the message it's the ID of the @@ -108,14 +95,15 @@ int zmq::stream_t::xsend (msg_t *msg_) if (msg_->flags () & msg_t::more) { // Find the pipe associated with the routing id stored in the prefix. // If there's no such pipe return an error - blob_t routing_id (static_cast (msg_->data ()), - msg_->size ()); - outpipes_t::iterator it = _outpipes.find (routing_id); - if (it != _outpipes.end ()) { - _current_out = it->second.pipe; + out_pipe_t *out_pipe = lookup_out_pipe ( + blob_t (static_cast (msg_->data ()), + msg_->size (), reference_tag_t ())); + + if (out_pipe) { + _current_out = out_pipe->pipe; if (!_current_out->check_write ()) { - it->second.active = false; + out_pipe->active = false; _current_out = NULL; errno = EAGAIN; return -1; @@ -177,25 +165,14 @@ int zmq::stream_t::xsetsockopt (int option_, size_t optvallen_) { switch (option_) { - case ZMQ_CONNECT_ROUTING_ID: - // TODO why isn't it possible to set an empty connect_routing_id - // (which is the default value) - if (optval_ && optvallen_) { - connect_routing_id.assign ((char *) optval_, optvallen_); - return 0; - } - break; - case ZMQ_STREAM_NOTIFY: return do_setsockopt_int_as_bool_strict (optval_, optvallen_, &options.raw_notify); - break; default: - break; + return routing_socket_base_t::xsetsockopt (option_, optval_, + optvallen_); } - errno = EINVAL; - return -1; } int zmq::stream_t::xrecv (msg_t *msg_) @@ -293,12 +270,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) unsigned char buffer[5]; buffer[0] = 0; blob_t routing_id; - if (connect_routing_id.length ()) { - routing_id.set ((unsigned char *) connect_routing_id.c_str (), - connect_routing_id.length ()); - connect_routing_id.clear (); - outpipes_t::iterator it = _outpipes.find (routing_id); - zmq_assert (it == _outpipes.end ()); + const std::string connect_routing_id = extract_connect_routing_id (); + if (!connect_routing_id.empty ()) { + routing_id.set ( + reinterpret_cast (connect_routing_id.c_str ()), + connect_routing_id.length ()); + // Not allowed to duplicate an existing rid + zmq_assert (!has_out_pipe (routing_id)); } else { put_uint32 (buffer + 1, _next_integral_routing_id++); routing_id.set (buffer, sizeof buffer); @@ -307,10 +285,5 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) static_cast (routing_id.size ()); } pipe_->set_router_socket_routing_id (routing_id); - // Add the record into output pipes lookup table - outpipe_t outpipe = {pipe_, true}; - const bool ok = - _outpipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) - .second; - zmq_assert (ok); + add_out_pipe (ZMQ_MOVE (routing_id), pipe_); } diff --git a/src/stream.hpp b/src/stream.hpp index 58da1e32..46829764 100644 --- a/src/stream.hpp +++ b/src/stream.hpp @@ -39,7 +39,7 @@ namespace zmq class ctx_t; class pipe_t; -class stream_t : public socket_base_t +class stream_t : public routing_socket_base_t { public: stream_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); @@ -52,7 +52,6 @@ class stream_t : public socket_base_t bool xhas_in (); bool xhas_out (); void xread_activated (zmq::pipe_t *pipe_); - void xwrite_activated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); @@ -76,16 +75,6 @@ class stream_t : public socket_base_t // Holds the prefetched message. msg_t _prefetched_msg; - struct outpipe_t - { - zmq::pipe_t *pipe; - bool active; - }; - - // Outbound pipes indexed by the peer IDs. - typedef std::map outpipes_t; - outpipes_t _outpipes; - // The pipe we are currently writing to. zmq::pipe_t *_current_out;