diff --git a/src/mtrie.cpp b/src/mtrie.cpp index 8a838905..5f057e77 100644 --- a/src/mtrie.cpp +++ b/src/mtrie.cpp @@ -31,6 +31,7 @@ #include #include +#include #include "platform.hpp" #if defined ZMQ_HAVE_WINDOWS @@ -71,319 +72,460 @@ zmq::mtrie_t::~mtrie_t () bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) { - return add_helper (prefix_, size_, pipe_); -} + class mtrie_t *it = this; + + while (size_) { + const unsigned char c = *prefix_; + + if (c < it->min || c >= it->min + it->count) { + // The character is out of range of currently handled + // characters. We have to extend the table. + if (!it->count) { + it->min = c; + it->count = 1; + it->next.node = NULL; + } else if (it->count == 1) { + const unsigned char oldc = it->min; + class mtrie_t *oldp = it->next.node; + it->count = (it->min < c ? c - it->min : it->min - c) + 1; + it->next.table = static_cast ( + malloc (sizeof (class mtrie_t *) * it->count)); + alloc_assert (it->next.table); + for (unsigned short i = 0; i != it->count; ++i) + it->next.table[i] = 0; + it->min = std::min (it->min, c); + it->next.table[oldc - it->min] = oldp; + } else if (it->min < c) { + // The new character is above the current character range. + const unsigned short oldcount = it->count; + it->count = c - it->min + 1; + it->next.table = static_cast (realloc ( + it->next.table, sizeof (class mtrie_t *) * it->count)); + alloc_assert (it->next.table); + for (unsigned short i = oldcount; i != it->count; i++) + it->next.table[i] = NULL; + } else { + // The new character is below the current character range. + const unsigned short oldcount = it->count; + it->count = (it->min + oldcount) - c; + it->next.table = static_cast (realloc ( + it->next.table, sizeof (class mtrie_t *) * it->count)); + alloc_assert (it->next.table); + memmove (it->next.table + it->min - c, it->next.table, + oldcount * sizeof (class mtrie_t *)); + for (unsigned short i = 0; i != it->min - c; i++) + it->next.table[i] = NULL; + it->min = c; + } + } + + // If next node does not exist, create one. + if (it->count == 1) { + if (!it->next.node) { + it->next.node = new (std::nothrow) class mtrie_t; + alloc_assert (it->next.node); + ++(it->live_nodes); + } + + ++prefix_; + --size_; + it = it->next.node; + } else { + if (!it->next.table[c - it->min]) { + it->next.table[c - it->min] = + new (std::nothrow) class mtrie_t; + alloc_assert (it->next.table[c - it->min]); + ++(it->live_nodes); + } + + ++prefix_; + --size_; + it = it->next.table[c - it->min]; + } + } -bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ // We are at the node corresponding to the prefix. We are done. - if (!size_) { - bool result = !pipes; - if (!pipes) { - pipes = new (std::nothrow) pipes_t; - alloc_assert (pipes); - } - pipes->insert (pipe_); - return result; + const bool result = !it->pipes; + if (!it->pipes) { + it->pipes = new (std::nothrow) pipes_t; + alloc_assert (it->pipes); } + it->pipes->insert (pipe_); - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - - // The character is out of range of currently handled - // charcters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } - else - if (count == 1) { - unsigned char oldc = min; - mtrie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (mtrie_t**) - malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table [i] = 0; - min = std::min (min, c); - next.table [oldc - min] = oldp; - } - else - if (min < c) { - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = (mtrie_t**) realloc (next.table, - sizeof (mtrie_t*) * count); - alloc_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table [i] = NULL; - } - else { - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = (mtrie_t**) realloc (next.table, - sizeof (mtrie_t*) * count); - alloc_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (mtrie_t*)); - for (unsigned short i = 0; i != min - c; i++) - next.table [i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) mtrie_t; - alloc_assert (next.node); - ++live_nodes; - } - return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); - } - else { - if (!next.table [c - min]) { - next.table [c - min] = new (std::nothrow) mtrie_t; - alloc_assert (next.table [c - min]); - ++live_nodes; - } - return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); - } + return result; } - void zmq::mtrie_t::rm (pipe_t *pipe_, void (*func_) (unsigned char *data_, size_t size_, void *arg_), void *arg_) { + // This used to be implemented as a non-tail recursive travesal of the trie, + // which means remote clients controlled the depth of the recursion and the + // stack size. + // To simulate the non-tail recursion, with post-recursion changes depending on + // the result of the recursive call, a stack is used to re-visit the same node + // and operate on it again after children have been visisted. + // A boolean is used to record whether the node had already been visited and to + // determine if the pre- or post- children visit actions have to be taken. + // In the case of a node with (N > 1) children, the node has to be re-visited + // N times, in the correct order after each child visit. + std::list stack; unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_); + size_t maxbuffsize = 0; + struct iter it = {this, NULL, NULL, 0, 0, 0, false}; + stack.push_back (it); + + while (!stack.empty ()) { + it = stack.back (); + stack.pop_back (); + + if (!it.processed_for_removal) { + // Remove the subscription from this node. + if (it.node->pipes && it.node->pipes->erase (pipe_)) { + if (it.node->pipes->empty ()) { + func_ (buff, it.size, arg_); + } + + if (it.node->pipes->empty ()) { + delete it.node->pipes; + it.node->pipes = NULL; + } + } + + // Adjust the buffer. + if (it.size >= maxbuffsize) { + maxbuffsize = it.size + 256; + buff = + static_cast (realloc (buff, maxbuffsize)); + alloc_assert (buff); + } + + switch (it.node->count) { + case 0: + // If there are no subnodes in the trie, we are done with this node + // pre-processing. + break; + case 1: { + // If there's one subnode (optimisation). + + buff[it.size] = it.node->min; + // Mark this node as pre-processed and push it, so that the next + // visit after the operation on the child can do the removals. + it.processed_for_removal = true; + stack.push_back (it); + struct iter next = { + it.node->next.node, NULL, NULL, ++it.size, 0, 0, false}; + stack.push_back (next); + break; + } + default: { + // If there are multiple subnodes. + // When first visiting this node, initialize the new_min/max parameters + // which will then be used after each child has been processed, on the + // post-children iterations. + if (it.current_child == 0) { + // New min non-null character in the node table after the removal + it.new_min = it.node->min + it.node->count - 1; + // New max non-null character in the node table after the removal + it.new_max = it.node->min; + } + + // Mark this node as pre-processed and push it, so that the next + // visit after the operation on the child can do the removals. + buff[it.size] = it.node->min + it.current_child; + it.processed_for_removal = true; + stack.push_back (it); + if (it.node->next.table[it.current_child]) { + struct iter next = { + it.node->next.table[it.current_child], + NULL, + NULL, + it.size + 1, + 0, + 0, + false}; + stack.push_back (next); + } + } + } + } else { + // Reset back for the next time, in case this node doesn't get deleted. + // This is done unconditionally, unlike when setting this variable to true. + it.processed_for_removal = false; + + switch (it.node->count) { + case 0: + // If there are no subnodes in the trie, we are done with this node + // post-processing. + break; + case 1: + // If there's one subnode (optimisation). + + // Prune the node if it was made redundant by the removal + if (it.node->next.node->is_redundant ()) { + delete it.node->next.node; + it.node->next.node = NULL; + it.node->count = 0; + --it.node->live_nodes; + zmq_assert (it.node->live_nodes == 0); + } + break; + default: + // If there are multiple subnodes. + { + if (it.node->next.table[it.current_child]) { + // Prune redundant nodes from the mtrie + if (it.node->next.table[it.current_child] + ->is_redundant ()) { + delete it.node->next.table[it.current_child]; + it.node->next.table[it.current_child] = NULL; + + zmq_assert (it.node->live_nodes > 0); + --it.node->live_nodes; + } else { + // The node is not redundant, so it's a candidate for being + // the new min/max node. + // + // We loop through the node array from left to right, so the + // first non-null, non-redundant node encountered is the new + // minimum index. Conversely, the last non-redundant, non-null + // node encountered is the new maximum index. + if (it.current_child + it.node->min + < it.new_min) + it.new_min = + it.current_child + it.node->min; + if (it.current_child + it.node->min + > it.new_max) + it.new_max = + it.current_child + it.node->min; + } + } + + // If there are more children to visit, push again the current + // node, so that pre-processing can happen on the next child. + // If we are done, reset the child index so that the ::rm is + // fully idempotent. + ++it.current_child; + if (it.current_child >= it.node->count) + it.current_child = 0; + else { + stack.push_back (it); + continue; + } + + // All children have been visited and removed if needed, and + // all pre- and post-visit operations have been carried. + // Resize/free the node table if needed. + zmq_assert (it.node->count > 1); + + // Free the node table if it's no longer used. + switch (it.node->live_nodes) { + case 0: + free (it.node->next.table); + it.node->next.table = NULL; + it.node->count = 0; + break; + case 1: + // Compact the node table if possible + + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + zmq_assert (it.new_min == it.new_max); + zmq_assert (it.new_min >= it.node->min); + zmq_assert (it.new_min + < it.node->min + it.node->count); + { + class mtrie_t *node = + it.node->next + .table[it.new_min - it.node->min]; + zmq_assert (node); + free (it.node->next.table); + it.node->next.node = node; + } + it.node->count = 1; + it.node->min = it.new_min; + break; + default: + if (it.new_min > it.node->min + || it.new_max < it.node->min + + it.node->count - 1) { + zmq_assert (it.new_max - it.new_min + 1 + > 1); + + class mtrie_t **old_table = + it.node->next.table; + zmq_assert (it.new_min > it.node->min + || it.new_max + < it.node->min + + it.node->count - 1); + zmq_assert (it.new_min >= it.node->min); + zmq_assert (it.new_max + <= it.node->min + + it.node->count - 1); + zmq_assert (it.new_max - it.new_min + 1 + < it.node->count); + + it.node->count = + it.new_max - it.new_min + 1; + it.node->next.table = + static_cast ( + malloc (sizeof (class mtrie_t *) + * it.node->count)); + alloc_assert (it.node->next.table); + + memmove (it.node->next.table, + old_table + + (it.new_min - it.node->min), + sizeof (class mtrie_t *) + * it.node->count); + free (old_table); + + it.node->min = it.new_min; + } + } + } + } + } + } + free (buff); } -void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) +bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_) { - // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_) && pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - delete pipes; - pipes = 0; - } + // This used to be implemented as a non-tail recursive travesal of the trie, + // which means remote clients controlled the depth of the recursion and the + // stack size. + // To simulate the non-tail recursion, with post-recursion changes depending on + // the result of the recursive call, a stack is used to re-visit the same node + // and operate on it again after children have been visisted. + // A boolean is used to record whether the node had already been visited and to + // determine if the pre- or post- children visit actions have to be taken. + bool ret = false; + std::list stack; + struct iter it = {this, NULL, prefix_, size_, 0, 0, 0, false}; + stack.push_back (it); - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - alloc_assert (*buff_); - } + while (!stack.empty ()) { + it = stack.back (); + stack.pop_back (); - // If there are no subnodes in the trie, return. - if (count == 0) - return; + if (!it.processed_for_removal) { + if (!it.size) { + if (!it.node->pipes) { + ret = false; + continue; + } - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_) [buffsize_] = min; - buffsize_++; - next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, - func_, arg_); + typename pipes_t::size_type erased = + it.node->pipes->erase (pipe_); + if (it.node->pipes->empty ()) { + zmq_assert (erased == 1); + delete it.node->pipes; + it.node->pipes = NULL; + ret = true; + continue; + } - // Prune the node if it was made redundant by the removal - if (next.node->is_redundant ()) { - delete next.node; - next.node = 0; - count = 0; - --live_nodes; - zmq_assert (live_nodes == 0); - } - return; - } - - // If there are multiple subnodes. - // - // New min non-null character in the node table after the removal - unsigned char new_min = min + count - 1; - // New max non-null character in the node table after the removal - unsigned char new_max = min; - for (unsigned short c = 0; c != count; c++) { - (*buff_) [buffsize_] = min + c; - if (next.table [c]) { - next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, - maxbuffsize_, func_, arg_); - - // Prune redundant nodes from the mtrie - if (next.table [c]->is_redundant ()) { - delete next.table [c]; - next.table [c] = 0; - - zmq_assert (live_nodes > 0); - --live_nodes; + ret = (erased == 1); + continue; } - else { - // The node is not redundant, so it's a candidate for being - // the new min/max node. - // - // We loop through the node array from left to right, so the - // first non-null, non-redundant node encountered is the new - // minimum index. Conversely, the last non-redundant, non-null - // node encountered is the new maximum index. - if (c + min < new_min) - new_min = c + min; - if (c + min > new_max) - new_max = c + min; + + it.current_child = *it.prefix; + if (!it.node->count || it.current_child < it.node->min + || it.current_child >= it.node->min + it.node->count) { + ret = false; + continue; } - } - } - zmq_assert (count > 1); - - // Free the node table if it's no longer used. - if (live_nodes == 0) { - free (next.table); - next.table = NULL; - count = 0; - } - // Compact the node table if possible - else - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - zmq_assert (new_min == new_max); - zmq_assert (new_min >= min && new_min < min + count); - mtrie_t *node = next.table [new_min - min]; - zmq_assert (node); - free (next.table); - next.node = node; - count = 1; - min = new_min; - } - else - if (new_min > min || new_max < min + count - 1) { - zmq_assert (new_max - new_min + 1 > 1); - - mtrie_t **old_table = next.table; - zmq_assert (new_min > min || new_max < min + count - 1); - zmq_assert (new_min >= min); - zmq_assert (new_max <= min + count - 1); - zmq_assert (new_max - new_min + 1 < count); - - count = new_max - new_min + 1; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } -} - -bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return rm_helper (prefix_, size_, pipe_); -} - -bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - if (!size_) { - if (pipes) { - pipes_t::size_type erased = pipes->erase (pipe_); - zmq_assert (erased == 1); - if (pipes->empty ()) { - delete pipes; - pipes = 0; + it.next_node = + it.node->count == 1 + ? it.node->next.node + : it.node->next.table[it.current_child - it.node->min]; + if (!it.next_node) { + ret = false; + continue; } - } - return !pipes; - } - unsigned char c = *prefix_; - if (!count || c < min || c >= min + count) - return false; + it.processed_for_removal = true; + stack.push_back (it); + struct iter next = { + it.next_node, NULL, it.prefix + 1, it.size - 1, 0, 0, 0, false}; + stack.push_back (next); + } else { + it.processed_for_removal = false; - mtrie_t *next_node = - count == 1 ? next.node : next.table [c - min]; + if (it.next_node->is_redundant ()) { + delete it.next_node; + it.next_node = NULL; + zmq_assert (it.node->count > 0); - if (!next_node) - return false; + if (it.node->count == 1) { + it.node->next.node = NULL; + it.node->count = 0; + --it.node->live_nodes; + zmq_assert (it.node->live_nodes == 0); + } else { + it.node->next.table[it.current_child - it.node->min] = 0; + zmq_assert (it.node->live_nodes > 1); + --it.node->live_nodes; - bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); + // Compact the table if possible + if (it.node->live_nodes == 1) { + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + unsigned short i; + for (i = 0; i < it.node->count; ++i) + if (it.node->next.table[i]) + break; - if (next_node->is_redundant ()) { - delete next_node; - zmq_assert (count > 0); + zmq_assert (i < it.node->count); + it.node->min += i; + it.node->count = 1; + class mtrie_t *oldp = it.node->next.table[i]; + free (it.node->next.table); + it.node->next.table = NULL; + it.node->next.node = oldp; + } else if (it.current_child == it.node->min) { + // We can compact the table "from the left" + unsigned short i; + for (i = 1; i < it.node->count; ++i) + if (it.node->next.table[i]) + break; - if (count == 1) { - next.node = 0; - count = 0; - --live_nodes; - zmq_assert (live_nodes == 0); - } - else { - next.table [c - min] = 0; - zmq_assert (live_nodes > 1); - --live_nodes; + zmq_assert (i < it.node->count); + it.node->min += i; + it.node->count -= i; + class mtrie_t **old_table = it.node->next.table; + it.node->next.table = + static_cast (malloc ( + sizeof (class mtrie_t *) * it.node->count)); + alloc_assert (it.node->next.table); + memmove (it.node->next.table, old_table + i, + sizeof (class mtrie_t *) * it.node->count); + free (old_table); + } else if (it.current_child + == it.node->min + it.node->count - 1) { + // We can compact the table "from the right" + unsigned short i; + for (i = 1; i < it.node->count; ++i) + if (it.node->next.table[it.node->count - 1 - i]) + break; - // Compact the table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - unsigned short i; - for (i = 0; i < count; ++i) - if (next.table [i]) - break; - - zmq_assert (i < count); - min += i; - count = 1; - mtrie_t *oldp = next.table [i]; - free (next.table); - next.node = oldp; - } - else - if (c == min) { - // We can compact the table "from the left" - unsigned short i; - for (i = 1; i < count; ++i) - if (next.table [i]) - break; - - zmq_assert (i < count); - min += i; - count -= i; - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - memmove (next.table, old_table + i, sizeof (mtrie_t*) * count); - free (old_table); - } - else - if (c == min + count - 1) { - // We can compact the table "from the right" - unsigned short i; - for (i = 1; i < count; ++i) - if (next.table [count - 1 - i]) - break; - - zmq_assert (i < count); - count -= i; - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - alloc_assert (next.table); - memmove (next.table, old_table, sizeof (mtrie_t*) * count); - free (old_table); + zmq_assert (i < it.node->count); + it.node->count -= i; + class mtrie_t **old_table = it.node->next.table; + it.node->next.table = + static_cast (malloc ( + sizeof (class mtrie_t *) * it.node->count)); + alloc_assert (it.node->next.table); + memmove (it.node->next.table, old_table, + sizeof (class mtrie_t *) * it.node->count); + free (old_table); + } + } } } } diff --git a/src/mtrie.hpp b/src/mtrie.hpp index 6d49380c..9a07d504 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -70,14 +70,6 @@ namespace zmq private: - bool add_helper (unsigned char *prefix_, size_t size_, - zmq::pipe_t *pipe_); - void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); - bool rm_helper (unsigned char *prefix_, size_t size_, - zmq::pipe_t *pipe_); bool is_redundant () const; typedef std::set pipes_t; @@ -86,13 +78,25 @@ namespace zmq unsigned char min; unsigned short count; unsigned short live_nodes; - union { + union _next_t { class mtrie_t *node; class mtrie_t **table; } next; mtrie_t (const mtrie_t&); const mtrie_t &operator = (const mtrie_t&); + + struct iter + { + class mtrie_t *node; + class mtrie_t *next_node; + unsigned char *prefix; + size_t size; + unsigned short current_child; + unsigned char new_min; + unsigned char new_max; + bool processed_for_removal; + }; }; }