diff --git a/src/router.cpp b/src/router.cpp index ab4bb8bf..fe02f74b 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -63,8 +63,6 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::router_t::~router_t () { zmq_assert (_anonymous_pipes.empty ()); - ; - zmq_assert (_out_pipes.empty ()); _prefetched_id.close (); _prefetched_msg.close (); } @@ -189,19 +187,19 @@ int zmq::router_t::xsend (msg_t *msg_) // Find the pipe associated with the routing id stored in the prefix. // If there's no such pipe just silently ignore the message, unless - // router_mandatory is set. - blob_t routing_id (static_cast (msg_->data ()), - msg_->size (), zmq::reference_tag_t ()); - out_pipes_t::iterator it = _out_pipes.find (routing_id); + // router_mandatory is set. ; + out_pipe_t *out_pipe = lookup_out_pipe ( + blob_t (static_cast (msg_->data ()), + msg_->size (), zmq::reference_tag_t ())); - if (it != _out_pipes.end ()) { - _current_out = it->second.pipe; + if (out_pipe) { + _current_out = out_pipe->pipe; // Check whether pipe is closed or not if (!_current_out->check_write ()) { // Check whether pipe is full or not bool pipe_full = !_current_out->check_hwm (); - it->second.active = false; + out_pipe->active = false; _current_out = NULL; if (_mandatory) { @@ -398,6 +396,11 @@ bool zmq::router_t::xhas_in () return true; } +static bool check_pipe_hwm (const zmq::pipe_t &pipe) +{ + return pipe.check_hwm (); +} + bool zmq::router_t::xhas_out () { // In theory, ROUTER socket is always ready for writing (except when @@ -407,12 +410,7 @@ bool zmq::router_t::xhas_out () if (!_mandatory) return true; - bool has_out = false; - out_pipes_t::iterator it; - for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) - has_out |= it->second.pipe->check_hwm (); - - return has_out; + return any_of_out_pipes (check_pipe_hwm); } const zmq::blob_t &zmq::router_t::get_credential () const @@ -426,14 +424,13 @@ int zmq::router_t::get_peer_state (const void *routing_id_, int res = 0; blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_); - out_pipes_t::const_iterator it = _out_pipes.find (routing_id_blob); - if (it == _out_pipes.end ()) { + const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob); + if (!out_pipe) { errno = EHOSTUNREACH; return -1; } - const out_pipe_t &outpipe = it->second; - if (outpipe.pipe->check_hwm ()) + if (out_pipe->pipe->check_hwm ()) res |= ZMQ_POLLOUT; /** \todo does it make any sense to check the inpipe as well? */ @@ -444,7 +441,6 @@ int zmq::router_t::get_peer_state (const void *routing_id_, bool zmq::router_t::identify_peer (pipe_t *pipe_) { msg_t msg; - bool ok; blob_t routing_id; const std::string connect_routing_id = extract_connect_routing_id (); @@ -453,7 +449,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) reinterpret_cast (connect_routing_id.c_str ()), connect_routing_id.length ()); // Not allowed to duplicate an existing rid - zmq_assert (0 == _out_pipes.count (routing_id)); + zmq_assert (!has_out_pipe (routing_id)); } else if ( options .raw_socket) { // Always assign an integral routing id for raw-socket @@ -464,7 +460,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) } else if (!options.raw_socket) { // Pick up handshake cases and also case where next integral routing id is set msg.init (); - ok = pipe_->read (&msg); + bool ok = pipe_->read (&msg); if (!ok) return false; @@ -478,10 +474,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) } else { routing_id.set (static_cast (msg.data ()), msg.size ()); - out_pipes_t::iterator it = _out_pipes.find (routing_id); msg.close (); - if (it != _out_pipes.end ()) { + // Try to remove an existing routing id entry to allow the new + // connection to take the routing id. + out_pipe_t existing_outpipe = try_erase_out_pipe (routing_id); + + if (existing_outpipe.pipe) { if (!_handover) // Ignore peers with duplicate ID return false; @@ -494,19 +493,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) put_uint32 (buf + 1, _next_integral_routing_id++); blob_t new_routing_id (buf, sizeof buf); - it->second.pipe->set_router_socket_routing_id (new_routing_id); - out_pipe_t existing_outpipe = {it->second.pipe, - it->second.active}; + existing_outpipe.pipe->set_router_socket_routing_id ( + new_routing_id); - ok = _out_pipes - .ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (new_routing_id), - existing_outpipe) - .second; - zmq_assert (ok); - - // Remove the existing routing id entry to allow the new - // connection to take the routing id. - _out_pipes.erase (it); + add_out_pipe (ZMQ_MOVE (new_routing_id), existing_outpipe.pipe); if (existing_outpipe.pipe == _current_in) _terminate_current_in = true; @@ -517,11 +507,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) } pipe_->set_router_socket_routing_id (routing_id); - // Add the record into output pipes lookup table - out_pipe_t outpipe = {pipe_, true}; - ok = _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) - .second; - zmq_assert (ok); + add_out_pipe (ZMQ_MOVE (routing_id), pipe_); return true; } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b58306b5..f9bd64ad 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1813,9 +1813,51 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id () return res; } +void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id, pipe_t *pipe_) +{ + // Add the record into output pipes lookup table + const out_pipe_t outpipe = {pipe_, true}; + const bool ok = + _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) + .second; + zmq_assert (ok); +} + +bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id) const +{ + return 0 != _out_pipes.count (routing_id); +} + +zmq::routing_socket_base_t::out_pipe_t * +zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id) +{ + // TODO we could probably avoid constructor a temporary blob_t to call this function + out_pipes_t::iterator it = _out_pipes.find (routing_id); + return it == _out_pipes.end () ? NULL : &it->second; +} + +const zmq::routing_socket_base_t::out_pipe_t * +zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id) const +{ + // TODO we could probably avoid constructor a temporary blob_t to call this function + out_pipes_t::const_iterator it = _out_pipes.find (routing_id); + return it == _out_pipes.end () ? NULL : &it->second; +} + void zmq::routing_socket_base_t::erase_out_pipe (pipe_t *pipe_) { - out_pipes_t::iterator it = _out_pipes.find (pipe_->get_routing_id ()); - zmq_assert (it != _out_pipes.end ()); - _out_pipes.erase (it); + const size_t erased = _out_pipes.erase (pipe_->get_routing_id ()); + zmq_assert (erased); +} + +zmq::routing_socket_base_t::out_pipe_t +zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id) +{ + const out_pipes_t::iterator it = _out_pipes.find (routing_id); + out_pipe_t res = {NULL, false}; + if (it != _out_pipes.end ()) { + res = it->second; + _out_pipes.erase (it); + } + return res; } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 05b41898..924b8594 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -312,19 +312,35 @@ class routing_socket_base_t : public socket_base_t // own methods std::string extract_connect_routing_id (); - void erase_out_pipe (pipe_t *pipe_); - struct out_pipe_t { pipe_t *pipe; bool active; }; + void add_out_pipe (blob_t routing_id, pipe_t *pipe_); + bool has_out_pipe (const blob_t &routing_id) const; + out_pipe_t *lookup_out_pipe (const blob_t &routing_id); + const out_pipe_t *lookup_out_pipe (const blob_t &routing_id) const; + void erase_out_pipe (pipe_t *pipe_); + out_pipe_t try_erase_out_pipe (const blob_t &routing_id); + template bool any_of_out_pipes (Func func) + { + bool res = false; + for (out_pipes_t::iterator it = _out_pipes.begin (); + it != _out_pipes.end (); ++it) { + if (res |= func (*it->second.pipe)) + break; + } + + return res; + } + + private: // Outbound pipes indexed by the peer IDs. typedef std::map out_pipes_t; out_pipes_t _out_pipes; - private: // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types std::string _connect_routing_id; }; diff --git a/src/stream.cpp b/src/stream.cpp index 19f53167..2e888642 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -95,14 +95,15 @@ int zmq::stream_t::xsend (msg_t *msg_) if (msg_->flags () & msg_t::more) { // Find the pipe associated with the routing id stored in the prefix. // If there's no such pipe return an error - blob_t routing_id (static_cast (msg_->data ()), - msg_->size ()); - out_pipes_t::iterator it = _out_pipes.find (routing_id); - if (it != _out_pipes.end ()) { - _current_out = it->second.pipe; + out_pipe_t *out_pipe = lookup_out_pipe ( + blob_t (static_cast (msg_->data ()), + msg_->size (), reference_tag_t ())); + + if (out_pipe) { + _current_out = out_pipe->pipe; if (!_current_out->check_write ()) { - it->second.active = false; + out_pipe->active = false; _current_out = NULL; errno = EAGAIN; return -1; @@ -275,7 +276,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) reinterpret_cast (connect_routing_id.c_str ()), connect_routing_id.length ()); // Not allowed to duplicate an existing rid - zmq_assert (0 == _out_pipes.count (routing_id)); + zmq_assert (!has_out_pipe (routing_id)); } else { put_uint32 (buffer + 1, _next_integral_routing_id++); routing_id.set (buffer, sizeof buffer); @@ -284,10 +285,5 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) static_cast (routing_id.size ()); } pipe_->set_router_socket_routing_id (routing_id); - // Add the record into output pipes lookup table - out_pipe_t outpipe = {pipe_, true}; - const bool ok = - _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) - .second; - zmq_assert (ok); + add_out_pipe (ZMQ_MOVE (routing_id), pipe_); }