diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index 57cbb681..b4873ceb 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -45,7 +45,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_): buf(NULL), bufsize( 0 ), - maxsize( bufsize_ ), + max_size( bufsize_ ), msg_refcnt( NULL ) { @@ -53,7 +53,9 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t buf zmq::shared_message_memory_allocator::~shared_message_memory_allocator() { - deallocate(); + if (buf) { + deallocate(); + } } unsigned char* zmq::shared_message_memory_allocator::allocate() @@ -61,25 +63,38 @@ unsigned char* zmq::shared_message_memory_allocator::allocate() if (buf) { // release reference count to couple lifetime to messages - call_dec_ref(NULL, buf); - // release pointer because we are going to create a new buffer - release(); + zmq::atomic_counter_t *c = reinterpret_cast(buf); + + // if refcnt drops to 0, there are no message using the buffer + // because either all messages have been closed or only vsm-messages + // were created + if (c->sub(1)) { + // buffer is still in use as message data. "Release" it and create a new one + // release pointer because we are going to create a new buffer + release(); + } } - // @todo aligmnet padding may be needed - if (!buf) - { + // if buf != NULL it is not used by any message so we can re-use it for the next run + if (!buf) { // allocate memory for reference counters together with reception buffer size_t const maxCounters = std::ceil( (double)max_size / (double)msg_t::max_vsm_size); - size_t const allocationsize = maxsize + sizeof(zmq::atomic_counter_t) + maxCounters * sizeof(zmq::atomic_counter_t); + size_t const allocationsize = max_size + sizeof(zmq::atomic_counter_t) + maxCounters * sizeof(zmq::atomic_counter_t); buf = (unsigned char *) malloc(allocationsize); alloc_assert (buf); + new(buf) atomic_counter_t(1); - msg_refcnt = reinterpret_cast( buf + sizeof(atomic_counter_t) + maxsize ); + } + else + { + // release reference count to couple lifetime to messages + zmq::atomic_counter_t *c = reinterpret_cast(buf); + c->set(1); } - bufsize = maxsize; + bufsize = max_size; + msg_refcnt = reinterpret_cast( buf + sizeof(atomic_counter_t) + max_size ); return buf + sizeof( zmq::atomic_counter_t); } @@ -108,31 +123,24 @@ void zmq::shared_message_memory_allocator::inc_ref() void zmq::shared_message_memory_allocator::call_dec_ref(void*, void* hint) { zmq_assert( hint ); - zmq::atomic_counter_t *c = reinterpret_cast(hint); + unsigned char* buf = static_cast(hint); + zmq::atomic_counter_t *c = reinterpret_cast(buf); if (!c->sub(1)) { c->~atomic_counter_t(); - free(hint); + free(buf); + buf = NULL; } } size_t zmq::shared_message_memory_allocator::size() const { - if (buf) - { - return bufsize; - } - else - { - return 0; - } + return bufsize; } unsigned char* zmq::shared_message_memory_allocator::data() { - zmq_assert(buf); - return buf + sizeof(zmq::atomic_counter_t); } diff --git a/src/v2_decoder.hpp b/src/v2_decoder.hpp index 8cb892bd..44c9fe05 100644 --- a/src/v2_decoder.hpp +++ b/src/v2_decoder.hpp @@ -92,7 +92,7 @@ namespace zmq private: unsigned char* buf; size_t bufsize; - size_t maxsize; + size_t max_size; zmq::atomic_counter_t* msg_refcnt; };