From 03ebd39d1ffaa4332f96ebab5b3d3b8e280ddaec Mon Sep 17 00:00:00 2001 From: Doron Somech Date: Wed, 13 May 2020 17:32:06 +0300 Subject: [PATCH 1/4] problem: zeromq connects peer before handshake is completed Solution: delay connecting the peer pipe until the handshake is completed (cherry picked from commit e7f0090b161ce6344f6bd35009816a925c070b09) Conflicts: src/i_engine.hpp src/norm_engine.hpp src/pgm_receiver.hpp src/pgm_sender.hpp src/raw_engine.cpp src/session_base.cpp src/session_base.hpp src/stream_engine_base.cpp src/stream_engine_base.hpp src/udp_engine.hpp src/ws_engine.cpp src/zmtp_engine.cpp tests/test_mock_pub_sub.cpp --- src/i_engine.hpp | 4 ++++ src/ipc_connecter.cpp | 2 +- src/ipc_listener.cpp | 2 +- src/norm_engine.hpp | 2 ++ src/pgm_receiver.hpp | 1 + src/pgm_sender.hpp | 1 + src/session_base.cpp | 16 +++++++++++----- src/session_base.hpp | 1 + src/socks_connecter.cpp | 2 +- src/stream_engine.cpp | 12 ++++++++++-- src/stream_engine.hpp | 8 +++++++- src/tcp_connecter.cpp | 2 +- src/tcp_listener.cpp | 2 +- src/tipc_connecter.cpp | 2 +- src/tipc_listener.cpp | 2 +- 15 files changed, 44 insertions(+), 15 deletions(-) diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 7a61e8e9..09eb0ec8 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -41,6 +41,10 @@ namespace zmq { virtual ~i_engine () {} + // Indicate if the engine has an handshake stage. + // If engine has handshake stage, engine must call session.engine_ready when the handshake is complete. + virtual bool has_handshake_stage () = 0; + // Plug the engine to the session. virtual void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_) = 0; diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 34abb568..fbe53c6e 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -123,7 +123,7 @@ void zmq::ipc_connecter_t::out_event () } // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 5c2a028f..9c973971 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -102,7 +102,7 @@ void zmq::ipc_listener_t::in_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp index 72542e19..04d3e3a7 100644 --- a/src/norm_engine.hpp +++ b/src/norm_engine.hpp @@ -26,6 +26,8 @@ namespace zmq // create NORM instance, session, etc int init(const char* network_, bool send, bool recv); void shutdown(); + + bool has_handshake_stage () { return false; }; // i_engine interface implementation. // Plug the engine to the session. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 4594ab46..dadace5f 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -64,6 +64,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. + bool has_handshake_stage () { return false; }; void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index bed05f75..c83e28ed 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -63,6 +63,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. + bool has_handshake_stage () { return false; }; void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); diff --git a/src/session_base.cpp b/src/session_base.cpp index e8f90764..460cafe4 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -352,7 +352,18 @@ bool zmq::session_base_t::zap_enabled () void zmq::session_base_t::process_attach (i_engine *engine_) { zmq_assert (engine_ != NULL); + zmq_assert (!engine); + engine = engine_; + if (!engine_->has_handshake_stage ()) + engine_ready (); + + // Plug in the engine. + engine->plug (io_thread, this); +} + +void zmq::session_base_t::engine_ready () +{ // Create the pipe if it does not exist yet. if (!pipe && !is_terminating ()) { object_t *parents [2] = {this, socket}; @@ -381,11 +392,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_) // Ask socket to plug into the remote end of the pipe. send_bind (socket, pipes [1]); } - - // Plug in the engine. - zmq_assert (!engine); - engine = engine_; - engine->plug (io_thread, this); } void zmq::session_base_t::engine_error ( diff --git a/src/session_base.hpp b/src/session_base.hpp index 8730c271..ff4a899b 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -67,6 +67,7 @@ namespace zmq virtual void reset (); void flush (); void engine_error (zmq::stream_engine_t::error_reason_t reason); + void engine_ready (); // i_pipe_events interface implementation. void read_activated (zmq::pipe_t *pipe_); diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp index a3b70436..a4c764c3 100644 --- a/src/socks_connecter.cpp +++ b/src/socks_connecter.cpp @@ -152,7 +152,7 @@ void zmq::socks_connecter_t::in_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (s, options, endpoint); + stream_engine_t (s, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index e31bb0df..f5ec6dfb 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -72,7 +72,8 @@ #include "wire.hpp" zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, - const std::string &endpoint_) : + const std::string &endpoint_, + bool has_handshake_stage_) : s (fd_), inpos (NULL), insize (0), @@ -85,6 +86,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, greeting_size (v2_greeting_size), greeting_bytes_read (0), session (NULL), + _has_handshake_stage (has_handshake_stage_), options (options_), endpoint (endpoint_), plugged (false), @@ -272,9 +274,12 @@ void zmq::stream_engine_t::in_event () zmq_assert (!io_error); // If still handshaking, receive and process the greeting message. - if (unlikely (handshaking)) + if (unlikely (handshaking)) { if (!handshake ()) return; + else if (mechanism == NULL && _has_handshake_stage) + session->engine_ready (); + } zmq_assert (decoder); @@ -800,6 +805,9 @@ void zmq::stream_engine_t::zap_msg_available () void zmq::stream_engine_t::mechanism_ready () { + if (_has_handshake_stage) + session->engine_ready (); + if (options.recv_identity) { msg_t identity; mechanism->peer_identity (&identity); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index e9635f6f..718c6598 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -70,10 +70,12 @@ namespace zmq }; stream_engine_t (fd_t fd_, const options_t &options_, - const std::string &endpoint); + const std::string &endpoint, + bool has_handshake_stage_); ~stream_engine_t (); // i_engine interface implementation. + bool has_handshake_stage () { return _has_handshake_stage; }; void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); @@ -170,6 +172,10 @@ namespace zmq // The session this engine is attached to. zmq::session_base_t *session; + // Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready + // when handshake is completed. + bool _has_handshake_stage; + options_t options; // String representation of endpoint diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 21ba15f6..5cd2d470 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -139,7 +139,7 @@ void zmq::tcp_connecter_t::out_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index a16fb550..e83c9282 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -106,7 +106,7 @@ void zmq::tcp_listener_t::in_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp index 11b53c50..4c6f7b17 100644 --- a/src/tipc_connecter.cpp +++ b/src/tipc_connecter.cpp @@ -121,7 +121,7 @@ void zmq::tipc_connecter_t::out_event () return; } // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); + stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp index fb8df6c3..7003630d 100644 --- a/src/tipc_listener.cpp +++ b/src/tipc_listener.cpp @@ -89,7 +89,7 @@ void zmq::tipc_listener_t::in_event () } // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); + stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already From 7006df0ac1d79895e1c86d709dd5d1bc5c46b29a Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sun, 28 Jun 2020 13:36:57 +0100 Subject: [PATCH 2/4] Problem: test_security_zap occasionally segfaults Solution: check if a session's _pipe has been allocated before using it, since as a consequence of creating the pipes after the handshake it's no longer guaranteed to be there. (cherry picked from commit 350b4b34f460b91b8fa8f692cf6bc30d561a5711) Conflicts: src/session_base.cpp --- src/session_base.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/session_base.cpp b/src/session_base.cpp index 460cafe4..9c69c875 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -256,7 +256,8 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_) } if (unlikely (engine == NULL)) { - pipe->check_read (); + if (pipe) + pipe->check_read (); return; } From 1a2022fedc0e738d01477bbc2fdef5aef7c7a178 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Fri, 15 May 2020 17:07:48 +0100 Subject: [PATCH 3/4] Problem: unfinished message can be leaked by client pipe When a pipe processes a delimiter and is already not in active state but still has an unfinished message, the message is leaked. Solution: issue a rollback before losing the reference to the pipe. (cherry picked from commit 6815138501b9f2a69e807bc3527d93583e633233) Conflicts: src/pipe.cpp --- src/pipe.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pipe.cpp b/src/pipe.cpp index e588215e..37935c7a 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -470,6 +470,7 @@ void zmq::pipe_t::process_delimiter () if (state == active) state = delimiter_received; else { + rollback (); outpipe = NULL; send_pipe_term_ack (peer); state = term_ack_sent; From 2fb9c40ca05c78756ad04ea4c343f1323504dc18 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sat, 13 Jun 2020 15:09:39 +0100 Subject: [PATCH 4/4] Problem: mtrie use of non-tail recursion leads to stack overflow Solution: convert add and rm functions to iterative algorithms --- src/mtrie.cpp | 708 ++++++++++++++++++++++++++++++-------------------- src/mtrie.hpp | 22 +- 2 files changed, 438 insertions(+), 292 deletions(-) diff --git a/src/mtrie.cpp b/src/mtrie.cpp index 8a838905..5f057e77 100644 --- a/src/mtrie.cpp +++ b/src/mtrie.cpp @@ -31,6 +31,7 @@ #include #include +#include #include "platform.hpp" #if defined ZMQ_HAVE_WINDOWS @@ -71,319 +72,460 @@ zmq::mtrie_t::~mtrie_t () bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) { - return add_helper (prefix_, size_, pipe_); -} + class mtrie_t *it = this; + + while (size_) { + const unsigned char c = *prefix_; + + if (c < it->min || c >= it->min + it->count) { + // The character is out of range of currently handled + // characters. We have to extend the table. + if (!it->count) { + it->min = c; + it->count = 1; + it->next.node = NULL; + } else if (it->count == 1) { + const unsigned char oldc = it->min; + class mtrie_t *oldp = it->next.node; + it->count = (it->min < c ? c - it->min : it->min - c) + 1; + it->next.table = static_cast ( + malloc (sizeof (class mtrie_t *) * it->count)); + alloc_assert (it->next.table); + for (unsigned short i = 0; i != it->count; ++i) + it->next.table[i] = 0; + it->min = std::min (it->min, c); + it->next.table[oldc - it->min] = oldp; + } else if (it->min < c) { + // The new character is above the current character range. + const unsigned short oldcount = it->count; + it->count = c - it->min + 1; + it->next.table = static_cast (realloc ( + it->next.table, sizeof (class mtrie_t *) * it->count)); + alloc_assert (it->next.table); + for (unsigned short i = oldcount; i != it->count; i++) + it->next.table[i] = NULL; + } else { + // The new character is below the current character range. + const unsigned short oldcount = it->count; + it->count = (it->min + oldcount) - c; + it->next.table = static_cast (realloc ( + it->next.table, sizeof (class mtrie_t *) * it->count)); + alloc_assert (it->next.table); + memmove (it->next.table + it->min - c, it->next.table, + oldcount * sizeof (class mtrie_t *)); + for (unsigned short i = 0; i != it->min - c; i++) + it->next.table[i] = NULL; + it->min = c; + } + } + + // If next node does not exist, create one. + if (it->count == 1) { + if (!it->next.node) { + it->next.node = new (std::nothrow) class mtrie_t; + alloc_assert (it->next.node); + ++(it->live_nodes); + } + + ++prefix_; + --size_; + it = it->next.node; + } else { + if (!it->next.table[c - it->min]) { + it->next.table[c - it->min] = + new (std::nothrow) class mtrie_t; + alloc_assert (it->next.table[c - it->min]); + ++(it->live_nodes); + } + + ++prefix_; + --size_; + it = it->next.table[c - it->min]; + } + } -bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ // We are at the node corresponding to the prefix. We are done. - if (!size_) { - bool result = !pipes; - if (!pipes) { - pipes = new (std::nothrow) pipes_t; - alloc_assert (pipes); - } - pipes->insert (pipe_); - return result; + const bool result = !it->pipes; + if (!it->pipes) { + it->pipes = new (std::nothrow) pipes_t; + alloc_assert (it->pipes); } + it->pipes->insert (pipe_); - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - - // The character is out of range of currently handled - // charcters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } - else - if (count == 1) { - unsigned char oldc = min; - mtrie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (mtrie_t**) - malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table [i] = 0; - min = std::min (min, c); - next.table [oldc - min] = oldp; - } - else - if (min < c) { - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = (mtrie_t**) realloc (next.table, - sizeof (mtrie_t*) * count); - alloc_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table [i] = NULL; - } - else { - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = (mtrie_t**) realloc (next.table, - sizeof (mtrie_t*) * count); - alloc_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (mtrie_t*)); - for (unsigned short i = 0; i != min - c; i++) - next.table [i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) mtrie_t; - alloc_assert (next.node); - ++live_nodes; - } - return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); - } - else { - if (!next.table [c - min]) { - next.table [c - min] = new (std::nothrow) mtrie_t; - alloc_assert (next.table [c - min]); - ++live_nodes; - } - return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); - } + return result; } - void zmq::mtrie_t::rm (pipe_t *pipe_, void (*func_) (unsigned char *data_, size_t size_, void *arg_), void *arg_) { + // This used to be implemented as a non-tail recursive travesal of the trie, + // which means remote clients controlled the depth of the recursion and the + // stack size. + // To simulate the non-tail recursion, with post-recursion changes depending on + // the result of the recursive call, a stack is used to re-visit the same node + // and operate on it again after children have been visisted. + // A boolean is used to record whether the node had already been visited and to + // determine if the pre- or post- children visit actions have to be taken. + // In the case of a node with (N > 1) children, the node has to be re-visited + // N times, in the correct order after each child visit. + std::list stack; unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_); + size_t maxbuffsize = 0; + struct iter it = {this, NULL, NULL, 0, 0, 0, false}; + stack.push_back (it); + + while (!stack.empty ()) { + it = stack.back (); + stack.pop_back (); + + if (!it.processed_for_removal) { + // Remove the subscription from this node. + if (it.node->pipes && it.node->pipes->erase (pipe_)) { + if (it.node->pipes->empty ()) { + func_ (buff, it.size, arg_); + } + + if (it.node->pipes->empty ()) { + delete it.node->pipes; + it.node->pipes = NULL; + } + } + + // Adjust the buffer. + if (it.size >= maxbuffsize) { + maxbuffsize = it.size + 256; + buff = + static_cast (realloc (buff, maxbuffsize)); + alloc_assert (buff); + } + + switch (it.node->count) { + case 0: + // If there are no subnodes in the trie, we are done with this node + // pre-processing. + break; + case 1: { + // If there's one subnode (optimisation). + + buff[it.size] = it.node->min; + // Mark this node as pre-processed and push it, so that the next + // visit after the operation on the child can do the removals. + it.processed_for_removal = true; + stack.push_back (it); + struct iter next = { + it.node->next.node, NULL, NULL, ++it.size, 0, 0, false}; + stack.push_back (next); + break; + } + default: { + // If there are multiple subnodes. + // When first visiting this node, initialize the new_min/max parameters + // which will then be used after each child has been processed, on the + // post-children iterations. + if (it.current_child == 0) { + // New min non-null character in the node table after the removal + it.new_min = it.node->min + it.node->count - 1; + // New max non-null character in the node table after the removal + it.new_max = it.node->min; + } + + // Mark this node as pre-processed and push it, so that the next + // visit after the operation on the child can do the removals. + buff[it.size] = it.node->min + it.current_child; + it.processed_for_removal = true; + stack.push_back (it); + if (it.node->next.table[it.current_child]) { + struct iter next = { + it.node->next.table[it.current_child], + NULL, + NULL, + it.size + 1, + 0, + 0, + false}; + stack.push_back (next); + } + } + } + } else { + // Reset back for the next time, in case this node doesn't get deleted. + // This is done unconditionally, unlike when setting this variable to true. + it.processed_for_removal = false; + + switch (it.node->count) { + case 0: + // If there are no subnodes in the trie, we are done with this node + // post-processing. + break; + case 1: + // If there's one subnode (optimisation). + + // Prune the node if it was made redundant by the removal + if (it.node->next.node->is_redundant ()) { + delete it.node->next.node; + it.node->next.node = NULL; + it.node->count = 0; + --it.node->live_nodes; + zmq_assert (it.node->live_nodes == 0); + } + break; + default: + // If there are multiple subnodes. + { + if (it.node->next.table[it.current_child]) { + // Prune redundant nodes from the mtrie + if (it.node->next.table[it.current_child] + ->is_redundant ()) { + delete it.node->next.table[it.current_child]; + it.node->next.table[it.current_child] = NULL; + + zmq_assert (it.node->live_nodes > 0); + --it.node->live_nodes; + } else { + // The node is not redundant, so it's a candidate for being + // the new min/max node. + // + // We loop through the node array from left to right, so the + // first non-null, non-redundant node encountered is the new + // minimum index. Conversely, the last non-redundant, non-null + // node encountered is the new maximum index. + if (it.current_child + it.node->min + < it.new_min) + it.new_min = + it.current_child + it.node->min; + if (it.current_child + it.node->min + > it.new_max) + it.new_max = + it.current_child + it.node->min; + } + } + + // If there are more children to visit, push again the current + // node, so that pre-processing can happen on the next child. + // If we are done, reset the child index so that the ::rm is + // fully idempotent. + ++it.current_child; + if (it.current_child >= it.node->count) + it.current_child = 0; + else { + stack.push_back (it); + continue; + } + + // All children have been visited and removed if needed, and + // all pre- and post-visit operations have been carried. + // Resize/free the node table if needed. + zmq_assert (it.node->count > 1); + + // Free the node table if it's no longer used. + switch (it.node->live_nodes) { + case 0: + free (it.node->next.table); + it.node->next.table = NULL; + it.node->count = 0; + break; + case 1: + // Compact the node table if possible + + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + zmq_assert (it.new_min == it.new_max); + zmq_assert (it.new_min >= it.node->min); + zmq_assert (it.new_min + < it.node->min + it.node->count); + { + class mtrie_t *node = + it.node->next + .table[it.new_min - it.node->min]; + zmq_assert (node); + free (it.node->next.table); + it.node->next.node = node; + } + it.node->count = 1; + it.node->min = it.new_min; + break; + default: + if (it.new_min > it.node->min + || it.new_max < it.node->min + + it.node->count - 1) { + zmq_assert (it.new_max - it.new_min + 1 + > 1); + + class mtrie_t **old_table = + it.node->next.table; + zmq_assert (it.new_min > it.node->min + || it.new_max + < it.node->min + + it.node->count - 1); + zmq_assert (it.new_min >= it.node->min); + zmq_assert (it.new_max + <= it.node->min + + it.node->count - 1); + zmq_assert (it.new_max - it.new_min + 1 + < it.node->count); + + it.node->count = + it.new_max - it.new_min + 1; + it.node->next.table = + static_cast ( + malloc (sizeof (class mtrie_t *) + * it.node->count)); + alloc_assert (it.node->next.table); + + memmove (it.node->next.table, + old_table + + (it.new_min - it.node->min), + sizeof (class mtrie_t *) + * it.node->count); + free (old_table); + + it.node->min = it.new_min; + } + } + } + } + } + } + free (buff); } -void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) +bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_) { - // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_) && pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - delete pipes; - pipes = 0; - } + // This used to be implemented as a non-tail recursive travesal of the trie, + // which means remote clients controlled the depth of the recursion and the + // stack size. + // To simulate the non-tail recursion, with post-recursion changes depending on + // the result of the recursive call, a stack is used to re-visit the same node + // and operate on it again after children have been visisted. + // A boolean is used to record whether the node had already been visited and to + // determine if the pre- or post- children visit actions have to be taken. + bool ret = false; + std::list stack; + struct iter it = {this, NULL, prefix_, size_, 0, 0, 0, false}; + stack.push_back (it); - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - alloc_assert (*buff_); - } + while (!stack.empty ()) { + it = stack.back (); + stack.pop_back (); - // If there are no subnodes in the trie, return. - if (count == 0) - return; + if (!it.processed_for_removal) { + if (!it.size) { + if (!it.node->pipes) { + ret = false; + continue; + } - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_) [buffsize_] = min; - buffsize_++; - next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, - func_, arg_); + typename pipes_t::size_type erased = + it.node->pipes->erase (pipe_); + if (it.node->pipes->empty ()) { + zmq_assert (erased == 1); + delete it.node->pipes; + it.node->pipes = NULL; + ret = true; + continue; + } - // Prune the node if it was made redundant by the removal - if (next.node->is_redundant ()) { - delete next.node; - next.node = 0; - count = 0; - --live_nodes; - zmq_assert (live_nodes == 0); - } - return; - } - - // If there are multiple subnodes. - // - // New min non-null character in the node table after the removal - unsigned char new_min = min + count - 1; - // New max non-null character in the node table after the removal - unsigned char new_max = min; - for (unsigned short c = 0; c != count; c++) { - (*buff_) [buffsize_] = min + c; - if (next.table [c]) { - next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, - maxbuffsize_, func_, arg_); - - // Prune redundant nodes from the mtrie - if (next.table [c]->is_redundant ()) { - delete next.table [c]; - next.table [c] = 0; - - zmq_assert (live_nodes > 0); - --live_nodes; + ret = (erased == 1); + continue; } - else { - // The node is not redundant, so it's a candidate for being - // the new min/max node. - // - // We loop through the node array from left to right, so the - // first non-null, non-redundant node encountered is the new - // minimum index. Conversely, the last non-redundant, non-null - // node encountered is the new maximum index. - if (c + min < new_min) - new_min = c + min; - if (c + min > new_max) - new_max = c + min; + + it.current_child = *it.prefix; + if (!it.node->count || it.current_child < it.node->min + || it.current_child >= it.node->min + it.node->count) { + ret = false; + continue; } - } - } - zmq_assert (count > 1); - - // Free the node table if it's no longer used. - if (live_nodes == 0) { - free (next.table); - next.table = NULL; - count = 0; - } - // Compact the node table if possible - else - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - zmq_assert (new_min == new_max); - zmq_assert (new_min >= min && new_min < min + count); - mtrie_t *node = next.table [new_min - min]; - zmq_assert (node); - free (next.table); - next.node = node; - count = 1; - min = new_min; - } - else - if (new_min > min || new_max < min + count - 1) { - zmq_assert (new_max - new_min + 1 > 1); - - mtrie_t **old_table = next.table; - zmq_assert (new_min > min || new_max < min + count - 1); - zmq_assert (new_min >= min); - zmq_assert (new_max <= min + count - 1); - zmq_assert (new_max - new_min + 1 < count); - - count = new_max - new_min + 1; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } -} - -bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return rm_helper (prefix_, size_, pipe_); -} - -bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - if (!size_) { - if (pipes) { - pipes_t::size_type erased = pipes->erase (pipe_); - zmq_assert (erased == 1); - if (pipes->empty ()) { - delete pipes; - pipes = 0; + it.next_node = + it.node->count == 1 + ? it.node->next.node + : it.node->next.table[it.current_child - it.node->min]; + if (!it.next_node) { + ret = false; + continue; } - } - return !pipes; - } - unsigned char c = *prefix_; - if (!count || c < min || c >= min + count) - return false; + it.processed_for_removal = true; + stack.push_back (it); + struct iter next = { + it.next_node, NULL, it.prefix + 1, it.size - 1, 0, 0, 0, false}; + stack.push_back (next); + } else { + it.processed_for_removal = false; - mtrie_t *next_node = - count == 1 ? next.node : next.table [c - min]; + if (it.next_node->is_redundant ()) { + delete it.next_node; + it.next_node = NULL; + zmq_assert (it.node->count > 0); - if (!next_node) - return false; + if (it.node->count == 1) { + it.node->next.node = NULL; + it.node->count = 0; + --it.node->live_nodes; + zmq_assert (it.node->live_nodes == 0); + } else { + it.node->next.table[it.current_child - it.node->min] = 0; + zmq_assert (it.node->live_nodes > 1); + --it.node->live_nodes; - bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); + // Compact the table if possible + if (it.node->live_nodes == 1) { + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + unsigned short i; + for (i = 0; i < it.node->count; ++i) + if (it.node->next.table[i]) + break; - if (next_node->is_redundant ()) { - delete next_node; - zmq_assert (count > 0); + zmq_assert (i < it.node->count); + it.node->min += i; + it.node->count = 1; + class mtrie_t *oldp = it.node->next.table[i]; + free (it.node->next.table); + it.node->next.table = NULL; + it.node->next.node = oldp; + } else if (it.current_child == it.node->min) { + // We can compact the table "from the left" + unsigned short i; + for (i = 1; i < it.node->count; ++i) + if (it.node->next.table[i]) + break; - if (count == 1) { - next.node = 0; - count = 0; - --live_nodes; - zmq_assert (live_nodes == 0); - } - else { - next.table [c - min] = 0; - zmq_assert (live_nodes > 1); - --live_nodes; + zmq_assert (i < it.node->count); + it.node->min += i; + it.node->count -= i; + class mtrie_t **old_table = it.node->next.table; + it.node->next.table = + static_cast (malloc ( + sizeof (class mtrie_t *) * it.node->count)); + alloc_assert (it.node->next.table); + memmove (it.node->next.table, old_table + i, + sizeof (class mtrie_t *) * it.node->count); + free (old_table); + } else if (it.current_child + == it.node->min + it.node->count - 1) { + // We can compact the table "from the right" + unsigned short i; + for (i = 1; i < it.node->count; ++i) + if (it.node->next.table[it.node->count - 1 - i]) + break; - // Compact the table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - unsigned short i; - for (i = 0; i < count; ++i) - if (next.table [i]) - break; - - zmq_assert (i < count); - min += i; - count = 1; - mtrie_t *oldp = next.table [i]; - free (next.table); - next.node = oldp; - } - else - if (c == min) { - // We can compact the table "from the left" - unsigned short i; - for (i = 1; i < count; ++i) - if (next.table [i]) - break; - - zmq_assert (i < count); - min += i; - count -= i; - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - memmove (next.table, old_table + i, sizeof (mtrie_t*) * count); - free (old_table); - } - else - if (c == min + count - 1) { - // We can compact the table "from the right" - unsigned short i; - for (i = 1; i < count; ++i) - if (next.table [count - 1 - i]) - break; - - zmq_assert (i < count); - count -= i; - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - memmove (next.table, old_table, sizeof (mtrie_t*) * count); - free (old_table); + zmq_assert (i < it.node->count); + it.node->count -= i; + class mtrie_t **old_table = it.node->next.table; + it.node->next.table = + static_cast (malloc ( + sizeof (class mtrie_t *) * it.node->count)); + alloc_assert (it.node->next.table); + memmove (it.node->next.table, old_table, + sizeof (class mtrie_t *) * it.node->count); + free (old_table); + } + } } } } diff --git a/src/mtrie.hpp b/src/mtrie.hpp index 6d49380c..9a07d504 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -70,14 +70,6 @@ namespace zmq private: - bool add_helper (unsigned char *prefix_, size_t size_, - zmq::pipe_t *pipe_); - void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); - bool rm_helper (unsigned char *prefix_, size_t size_, - zmq::pipe_t *pipe_); bool is_redundant () const; typedef std::set pipes_t; @@ -86,13 +78,25 @@ namespace zmq unsigned char min; unsigned short count; unsigned short live_nodes; - union { + union _next_t { class mtrie_t *node; class mtrie_t **table; } next; mtrie_t (const mtrie_t&); const mtrie_t &operator = (const mtrie_t&); + + struct iter + { + class mtrie_t *node; + class mtrie_t *next_node; + unsigned char *prefix; + size_t size; + unsigned short current_child; + unsigned char new_min; + unsigned char new_max; + bool processed_for_removal; + }; }; }