diff --git a/CMakeLists.txt b/CMakeLists.txt index 1a3877fa..279033cd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -435,7 +435,8 @@ set(cxx-sources xpub.cpp xsub.cpp zmq.cpp - zmq_utils.cpp) + zmq_utils.cpp + decoder_allocators.cpp) set(rc-sources version.rc) diff --git a/Makefile.am b/Makefile.am index 86b1b4bd..da9c2bad 100644 --- a/Makefile.am +++ b/Makefile.am @@ -207,7 +207,10 @@ src_libzmq_la_SOURCES = \ src/ypipe_conflate.hpp \ src/yqueue.hpp \ src/zmq.cpp \ - src/zmq_utils.cpp + src/zmq_utils.cpp \ + src/decoder_allocators.hpp \ + src/decoder_allocators.cpp + if ON_MINGW src_libzmq_la_LDFLAGS = \ diff --git a/src/decoder.hpp b/src/decoder.hpp index 4d00d45d..2190f980 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -39,52 +39,10 @@ #include "msg.hpp" #include "i_decoder.hpp" #include "stdint.hpp" +#include "decoder_allocators.hpp" namespace zmq { - // Static buffer policy. - class c_single_allocator - { - public: - c_single_allocator(size_t bufsize_): - bufsize(bufsize_), - buf((unsigned char*) malloc (bufsize)) - { - alloc_assert (buf); - } - - ~c_single_allocator() - { - free(buf); - } - - unsigned char* allocate() - { - return buf; - } - - void deallocate() - { - - } - - size_t size() const - { - return bufsize; - } - - void resize(size_t new_size) - { - bufsize = new_size; - } - private: - size_t bufsize; - unsigned char* buf; - - c_single_allocator( c_single_allocator const& ); - c_single_allocator& operator=(c_single_allocator const&); - }; - // Helper base class for decoders that know the amount of data to read // in advance at any moment. Knowing the amount in advance is a property // of the protocol used. 0MQ framing protocol is based size-prefixed diff --git a/src/decoder_allocators.cpp b/src/decoder_allocators.cpp new file mode 100644 index 00000000..4068ed6a --- /dev/null +++ b/src/decoder_allocators.cpp @@ -0,0 +1,146 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "decoder_allocators.hpp" + +#include + +#include "msg.hpp" + +zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_): + buf(NULL), + bufsize( 0 ), + max_size( bufsize_ ), + msg_refcnt( NULL ), + maxCounters( std::ceil( (double)max_size / (double)msg_t::max_vsm_size) ) +{ + +} + +zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_, size_t maxMessages): + buf(NULL), + bufsize( 0 ), + max_size( bufsize_ ), + msg_refcnt( NULL ), + maxCounters( maxMessages ) +{ + +} + +zmq::shared_message_memory_allocator::~shared_message_memory_allocator() +{ + if (buf) { + deallocate(); + } +} + +unsigned char* zmq::shared_message_memory_allocator::allocate() +{ + if (buf) + { + // release reference count to couple lifetime to messages + zmq::atomic_counter_t *c = reinterpret_cast(buf); + + // if refcnt drops to 0, there are no message using the buffer + // because either all messages have been closed or only vsm-messages + // were created + if (c->sub(1)) { + // buffer is still in use as message data. "Release" it and create a new one + // release pointer because we are going to create a new buffer + release(); + } + } + + // if buf != NULL it is not used by any message so we can re-use it for the next run + if (!buf) { + // allocate memory for reference counters together with reception buffer + size_t const allocationsize = max_size + sizeof(zmq::atomic_counter_t) + maxCounters * sizeof(zmq::atomic_counter_t); + + buf = (unsigned char *) malloc(allocationsize); + alloc_assert (buf); + + new(buf) atomic_counter_t(1); + } + else + { + // release reference count to couple lifetime to messages + zmq::atomic_counter_t *c = reinterpret_cast(buf); + c->set(1); + } + + bufsize = max_size; + msg_refcnt = reinterpret_cast( buf + sizeof(atomic_counter_t) + max_size ); + return buf + sizeof( zmq::atomic_counter_t); +} + +void zmq::shared_message_memory_allocator::deallocate() +{ + free(buf); + buf = NULL; + bufsize = 0; + msg_refcnt = NULL; +} + +unsigned char* zmq::shared_message_memory_allocator::release() +{ + unsigned char* b = buf; + buf = NULL; + bufsize = 0; + msg_refcnt = NULL; + + return b; +} + +void zmq::shared_message_memory_allocator::inc_ref() +{ + ((zmq::atomic_counter_t*)buf)->add(1); +} + +void zmq::shared_message_memory_allocator::call_dec_ref(void*, void* hint) { + zmq_assert( hint ); + unsigned char* buf = static_cast(hint); + zmq::atomic_counter_t *c = reinterpret_cast(buf); + + if (!c->sub(1)) { + c->~atomic_counter_t(); + free(buf); + buf = NULL; + } +} + + +size_t zmq::shared_message_memory_allocator::size() const +{ + return bufsize; +} + +unsigned char* zmq::shared_message_memory_allocator::data() +{ + return buf + sizeof(zmq::atomic_counter_t); +} \ No newline at end of file diff --git a/src/decoder_allocators.hpp b/src/decoder_allocators.hpp new file mode 100644 index 00000000..28edc543 --- /dev/null +++ b/src/decoder_allocators.hpp @@ -0,0 +1,149 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef ZEROMQ_DECODER_ALLOCATORS_HPP +#define ZEROMQ_DECODER_ALLOCATORS_HPP + +#include + +#include "err.hpp" +#include "atomic_counter.hpp" + +namespace zmq +{ + // Static buffer policy. + class c_single_allocator + { + public: + c_single_allocator(size_t bufsize_): + bufsize(bufsize_), + buf((unsigned char*) malloc (bufsize)) + { + alloc_assert (buf); + } + + ~c_single_allocator() + { + free(buf); + } + + unsigned char* allocate() + { + return buf; + } + + void deallocate() + { + + } + + size_t size() const + { + return bufsize; + } + + void resize(size_t new_size) + { + bufsize = new_size; + } + private: + size_t bufsize; + unsigned char* buf; + + c_single_allocator( c_single_allocator const& ); + c_single_allocator& operator=(c_single_allocator const&); + }; + + // This allocater allocates a reference counted buffer which is used by v2_decoder_t + // to use zero-copy msg::init_data to create messages with memory from this buffer as + // data storage. + // + // The buffer is allocated with a reference count of 1 to make sure that is is alive while + // decoding messages. Otherwise, it is possible that e.g. the first message increases the count + // from zero to one, gets passed to the user application, processed in the user thread and deleted + // which would then deallocate the buffer. The drawback is that the buffer may be allocated longer + // than necessary because it is only deleted when allocate is called the next time. + class shared_message_memory_allocator + { + public: + shared_message_memory_allocator(size_t bufsize_); + + // Create an allocator for a maximum number of messages + shared_message_memory_allocator(size_t bufsize_, size_t maxMessages); + + ~shared_message_memory_allocator(); + + // Allocate a new buffer + // + // This releases the current buffer to be bound to the lifetime of the messages + // created on this bufer. + unsigned char* allocate(); + + // force deallocation of buffer. + void deallocate(); + + // Give up ownership of the buffer. The buffer's lifetime is now coupled to + // the messages constructed on top of it. + unsigned char* release(); + + void inc_ref(); + + static void call_dec_ref(void*, void* buffer); + + size_t size() const; + + // Return pointer to the first message data byte. + unsigned char* data(); + + // Return pointer to the first byte of the buffer. + unsigned char* buffer() + { + return buf; + } + + void resize(size_t new_size) + { + bufsize = new_size; + } + + // + zmq::atomic_counter_t* create_refcnt() + { + return msg_refcnt++; + } + + private: + unsigned char* buf; + size_t bufsize; + size_t max_size; + zmq::atomic_counter_t* msg_refcnt; + size_t maxCounters; + }; +} +#endif //ZEROMQ_DECODER_ALLOCATORS_HPP diff --git a/src/raw_decoder.cpp b/src/raw_decoder.cpp index b98826cb..ceffd383 100644 --- a/src/raw_decoder.cpp +++ b/src/raw_decoder.cpp @@ -39,35 +39,38 @@ #include "err.hpp" zmq::raw_decoder_t::raw_decoder_t (size_t bufsize_) : - bufsize (bufsize_) + allocator( bufsize_, 1 ) { int rc = in_progress.init (); errno_assert (rc == 0); - - buffer = (unsigned char *) malloc (bufsize); - alloc_assert (buffer); } zmq::raw_decoder_t::~raw_decoder_t () { int rc = in_progress.close (); errno_assert (rc == 0); - - free (buffer); } void zmq::raw_decoder_t::get_buffer (unsigned char **data_, size_t *size_) { - *data_ = buffer; - *size_ = bufsize; + *data_ = allocator.allocate(); + *size_ = allocator.size(); } int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_, - size_t &bytes_used_) + size_t &bytes_used_) { - int rc = in_progress.init_size (size_); + int rc = in_progress.init ((unsigned char*)data_, size_, + shared_message_memory_allocator::call_dec_ref, + allocator.create_refcnt() ); + + // if the buffer serves as memory for a zero-copy message, release it + // and allocate a new buffer in get_buffer for the next decode + if (in_progress.is_zcmsg()) { + allocator.release(); + } + errno_assert (rc != -1); - memcpy (in_progress.data (), data_, size_); bytes_used_ = size_; return 1; } diff --git a/src/raw_decoder.hpp b/src/raw_decoder.hpp index 31310174..b119edcb 100644 --- a/src/raw_decoder.hpp +++ b/src/raw_decoder.hpp @@ -34,6 +34,7 @@ #include "msg.hpp" #include "i_decoder.hpp" #include "stdint.hpp" +#include "decoder_allocators.hpp" namespace zmq { @@ -57,14 +58,11 @@ namespace zmq virtual msg_t *msg () { return &in_progress; } virtual void resize_buffer(size_t) {} + private: - - msg_t in_progress; - const size_t bufsize; - - unsigned char *buffer; + shared_message_memory_allocator allocator; raw_decoder_t (const raw_decoder_t&); void operator = (const raw_decoder_t&); diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index b4873ceb..557fb7d3 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -42,107 +42,7 @@ #include "wire.hpp" #include "err.hpp" -zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_): - buf(NULL), - bufsize( 0 ), - max_size( bufsize_ ), - msg_refcnt( NULL ) -{ -} - -zmq::shared_message_memory_allocator::~shared_message_memory_allocator() -{ - if (buf) { - deallocate(); - } -} - -unsigned char* zmq::shared_message_memory_allocator::allocate() -{ - if (buf) - { - // release reference count to couple lifetime to messages - zmq::atomic_counter_t *c = reinterpret_cast(buf); - - // if refcnt drops to 0, there are no message using the buffer - // because either all messages have been closed or only vsm-messages - // were created - if (c->sub(1)) { - // buffer is still in use as message data. "Release" it and create a new one - // release pointer because we are going to create a new buffer - release(); - } - } - - // if buf != NULL it is not used by any message so we can re-use it for the next run - if (!buf) { - // allocate memory for reference counters together with reception buffer - size_t const maxCounters = std::ceil( (double)max_size / (double)msg_t::max_vsm_size); - size_t const allocationsize = max_size + sizeof(zmq::atomic_counter_t) + maxCounters * sizeof(zmq::atomic_counter_t); - - buf = (unsigned char *) malloc(allocationsize); - alloc_assert (buf); - - new(buf) atomic_counter_t(1); - } - else - { - // release reference count to couple lifetime to messages - zmq::atomic_counter_t *c = reinterpret_cast(buf); - c->set(1); - } - - bufsize = max_size; - msg_refcnt = reinterpret_cast( buf + sizeof(atomic_counter_t) + max_size ); - return buf + sizeof( zmq::atomic_counter_t); -} - -void zmq::shared_message_memory_allocator::deallocate() -{ - free(buf); - buf = NULL; - bufsize = 0; - msg_refcnt = NULL; -} - -unsigned char* zmq::shared_message_memory_allocator::release() -{ - unsigned char* b = buf; - buf = NULL; - bufsize = 0; - msg_refcnt = NULL; - - return b; -} - -void zmq::shared_message_memory_allocator::inc_ref() -{ - ((zmq::atomic_counter_t*)buf)->add(1); -} - -void zmq::shared_message_memory_allocator::call_dec_ref(void*, void* hint) { - zmq_assert( hint ); - unsigned char* buf = static_cast(hint); - zmq::atomic_counter_t *c = reinterpret_cast(buf); - - if (!c->sub(1)) { - c->~atomic_counter_t(); - free(buf); - buf = NULL; - } -} - - -size_t zmq::shared_message_memory_allocator::size() const -{ - return bufsize; -} - -unsigned char* zmq::shared_message_memory_allocator::data() -{ - return buf + sizeof(zmq::atomic_counter_t); -} zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) : shared_message_memory_allocator( bufsize_), diff --git a/src/v2_decoder.hpp b/src/v2_decoder.hpp index 44c9fe05..229d1cc0 100644 --- a/src/v2_decoder.hpp +++ b/src/v2_decoder.hpp @@ -31,71 +31,10 @@ #define __ZMQ_V2_DECODER_HPP_INCLUDED__ #include "decoder.hpp" +#include "decoder_allocators.hpp" namespace zmq { - // This allocater allocates a reference counted buffer which is used by v2_decoder_t - // to use zero-copy msg::init_data to create messages with memory from this buffer as - // data storage. - // - // The buffer is allocated with a reference count of 1 to make sure that is is alive while - // decoding messages. Otherwise, it is possible that e.g. the first message increases the count - // from zero to one, gets passed to the user application, processed in the user thread and deleted - // which would then deallocate the buffer. The drawback is that the buffer may be allocated longer - // than necessary because it is only deleted when allocate is called the next time. - class shared_message_memory_allocator - { - public: - shared_message_memory_allocator(size_t bufsize_); - - ~shared_message_memory_allocator(); - - // Allocate a new buffer - // - // This releases the current buffer to be bound to the lifetime of the messages - // created on this bufer. - unsigned char* allocate(); - - // force deallocation of buffer. - void deallocate(); - - // Give up ownership of the buffer. The buffer's lifetime is now coupled to - // the messages constructed on top of it. - unsigned char* release(); - - void inc_ref(); - - static void call_dec_ref(void*, void* buffer); - - size_t size() const; - - // Return pointer to the first message data byte. - unsigned char* data(); - - // Return pointer to the first byte of the buffer. - unsigned char* buffer() - { - return buf; - } - - void resize(size_t new_size) - { - bufsize = new_size; - } - - // - zmq::atomic_counter_t* create_refcnt() - { - return msg_refcnt++; - } - - private: - unsigned char* buf; - size_t bufsize; - size_t max_size; - zmq::atomic_counter_t* msg_refcnt; - }; - // Decoder for ZMTP/2.x framing protocol. Converts data stream into messages. // The class has to inherit from shared_message_memory_allocator because // the base class calls allocate in its constructor.