diff --git a/src/atomic_counter.hpp b/src/atomic_counter.hpp index 01d7d195..bceb8347 100644 --- a/src/atomic_counter.hpp +++ b/src/atomic_counter.hpp @@ -183,7 +183,7 @@ namespace zmq #endif } - inline integer_t get () + inline integer_t get () const { return value; } diff --git a/src/decoder.hpp b/src/decoder.hpp index fc4b177d..5458992b 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -42,6 +42,45 @@ namespace zmq { + // Static buffer policy. + class c_single_allocator + { + public: + c_single_allocator(size_t bufsize_): + buf((unsigned char*) malloc (bufsize) ), + bufsize(bufsize_) + { + alloc_assert (buf); + } + + ~c_single_allocator() + { + free(buf); + } + + unsigned char* allocate() + { + return buf; + } + + void deallocate() + { + + } + + size_t size() const + { + return bufsize; + } + + private: + unsigned char* buf; + size_t bufsize; + + c_single_allocator( c_single_allocator const& ); + c_single_allocator& operator=(c_single_allocator const&); + }; + // Helper base class for decoders that know the amount of data to read // in advance at any moment. Knowing the amount in advance is a property // of the protocol used. 0MQ framing protocol is based size-prefixed @@ -52,31 +91,34 @@ namespace zmq // // This class implements the state machine that parses the incoming buffer. // Derived class should implement individual state machine actions. - - template class decoder_base_t : public i_decoder + // + // Buffer managment is done by an allocator policy. + template + class decoder_base_t : public i_decoder { public: - inline decoder_base_t (size_t bufsize_) : - next (NULL), - read_pos (NULL), - to_read (0), - bufsize (bufsize_) + inline decoder_base_t (A* allocator_) : + next (NULL), + read_pos (NULL), + to_read (0), + allocator( allocator_ ) { - buf = (unsigned char*) malloc (bufsize_); - alloc_assert (buf); + buf = allocator->allocate(); } // The destructor doesn't have to be virtual. It is mad virtual // just to keep ICC and code checking tools from complaining. inline virtual ~decoder_base_t () { - free (buf); + allocator->deallocate(); } // Returns a buffer to be filled with binary data. inline void get_buffer (unsigned char **data_, size_t *size_) { + buf = allocator->allocate(); + // If we are expected to read large message, we'll opt for zero- // copy, i.e. we'll ask caller to fill the data directly to the // message. Note that subsequent read(s) are non-blocking, thus @@ -85,14 +127,14 @@ namespace zmq // As a consequence, large messages being received won't block // other engines running in the same I/O thread for excessive // amounts of time. - if (to_read >= bufsize) { + if (to_read >= allocator->size()) { *data_ = read_pos; *size_ = to_read; return; } *data_ = buf; - *size_ = bufsize; + *size_ = allocator->size(); } // Processes the data in the buffer previously allocated using @@ -116,7 +158,7 @@ namespace zmq bytes_used_ = size_; while (!to_read) { - const int rc = (static_cast (this)->*next) (); + const int rc = (static_cast (this)->*next) (data_ + bytes_used_); if (rc != 0) return rc; } @@ -126,14 +168,20 @@ namespace zmq while (bytes_used_ < size_) { // Copy the data from buffer to the message. const size_t to_copy = std::min (to_read, size_ - bytes_used_); - memcpy (read_pos, data_ + bytes_used_, to_copy); + // only copy when the destination address is different from the + // current address in the buffer + if (read_pos != data_ + bytes_used_) { + memcpy(read_pos, data_ + bytes_used_, to_copy); + } + read_pos += to_copy; to_read -= to_copy; bytes_used_ += to_copy; // Try to get more space in the message to fill in. // If none is available, return. while (to_read == 0) { - const int rc = (static_cast (this)->*next) (); + // pass current address in the buffer + const int rc = (static_cast (this)->*next) (data_ + bytes_used_); if (rc != 0) return rc; } @@ -146,7 +194,7 @@ namespace zmq // Prototype of state machine action. Action should return false if // it is unable to push the data to the system. - typedef int (T::*step_t) (); + typedef int (T::*step_t) (unsigned char const*); // This function should be called from derived class to read data // from the buffer and schedule next state machine action. @@ -171,8 +219,8 @@ namespace zmq size_t to_read; // The duffer for data to decode. - size_t bufsize; - unsigned char *buf; + A* allocator; + unsigned char* buf; decoder_base_t (const decoder_base_t&); const decoder_base_t &operator = (const decoder_base_t&); diff --git a/src/v1_decoder.cpp b/src/v1_decoder.cpp index a21bb2f5..508f3210 100644 --- a/src/v1_decoder.cpp +++ b/src/v1_decoder.cpp @@ -43,7 +43,8 @@ #include "err.hpp" zmq::v1_decoder_t::v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_) : - decoder_base_t (bufsize_), + c_single_allocator(bufsize_), + decoder_base_t (this), maxmsgsize (maxmsgsize_) { int rc = in_progress.init (); @@ -59,7 +60,7 @@ zmq::v1_decoder_t::~v1_decoder_t () errno_assert (rc == 0); } -int zmq::v1_decoder_t::one_byte_size_ready () +int zmq::v1_decoder_t::one_byte_size_ready (unsigned char const*) { // First byte of size is read. If it is 0xff read 8-byte size. // Otherwise allocate the buffer for message data and read the @@ -96,7 +97,7 @@ int zmq::v1_decoder_t::one_byte_size_ready () return 0; } -int zmq::v1_decoder_t::eight_byte_size_ready () +int zmq::v1_decoder_t::eight_byte_size_ready (unsigned char const*) { // 8-byte payload length is read. Allocate the buffer // for message body and read the message data into it. @@ -138,7 +139,7 @@ int zmq::v1_decoder_t::eight_byte_size_ready () return 0; } -int zmq::v1_decoder_t::flags_ready () +int zmq::v1_decoder_t::flags_ready (unsigned char const*) { // Store the flags from the wire into the message structure. in_progress.set_flags (tmpbuf [0] & msg_t::more); @@ -149,7 +150,7 @@ int zmq::v1_decoder_t::flags_ready () return 0; } -int zmq::v1_decoder_t::message_ready () +int zmq::v1_decoder_t::message_ready (unsigned char const*) { // Message is completely read. Push it further and start reading // new message. (in_progress is a 0-byte message after this point.) diff --git a/src/v1_decoder.hpp b/src/v1_decoder.hpp index 69207236..e1abdae2 100644 --- a/src/v1_decoder.hpp +++ b/src/v1_decoder.hpp @@ -36,7 +36,9 @@ namespace zmq { // Decoder for ZMTP/1.0 protocol. Converts data batches into messages. - class v1_decoder_t : public decoder_base_t + class v1_decoder_t : + public zmq::c_single_allocator, + public decoder_base_t { public: @@ -47,10 +49,10 @@ namespace zmq private: - int one_byte_size_ready (); - int eight_byte_size_ready (); - int flags_ready (); - int message_ready (); + int one_byte_size_ready (unsigned char const*); + int eight_byte_size_ready (unsigned char const*); + int flags_ready (unsigned char const*); + int message_ready (unsigned char const*); unsigned char tmpbuf [8]; msg_t in_progress; diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index 394b1e9f..9eb8d12f 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -41,8 +41,96 @@ #include "wire.hpp" #include "err.hpp" +zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_): + buf(NULL), + bufsize( bufsize_ ) +{ + +} + +zmq::shared_message_memory_allocator::~shared_message_memory_allocator() +{ + deallocate(); +} + +unsigned char* zmq::shared_message_memory_allocator::allocate() +{ + if (buf) + { + // release reference count to couple lifetime to messages + call_dec_ref(NULL, buf); + // release pointer because we are going to create a new buffer + release(); + } + + // @todo aligmnet padding may be needed + if (!buf) + { + buf = (unsigned char *) malloc(bufsize + sizeof(zmq::atomic_counter_t)); + alloc_assert (buf); + new(buf) atomic_counter_t(1); + } + + return buf + sizeof( zmq::atomic_counter_t); +} + +void zmq::shared_message_memory_allocator::deallocate() +{ + free(buf); + buf = NULL; +} + +unsigned char* zmq::shared_message_memory_allocator::release() +{ + unsigned char* b = buf; + buf = NULL; + return b; +} + +void zmq::shared_message_memory_allocator::reset(unsigned char* b) +{ + deallocate(); + buf = b; +} + +void zmq::shared_message_memory_allocator::inc_ref() +{ + ((zmq::atomic_counter_t*)buf)->add(1); +} + +void zmq::shared_message_memory_allocator::call_dec_ref(void*, void* hint) { + zmq_assert( hint ); + zmq::atomic_counter_t *c = reinterpret_cast(hint); + + if (!c->sub(1)) { + c->~atomic_counter_t(); + free(hint); + } +} + + +size_t zmq::shared_message_memory_allocator::size() const +{ + if (buf) + { + return bufsize; + } + else + { + return 0; + } +} + +unsigned char* zmq::shared_message_memory_allocator::data() +{ + zmq_assert(buf); + + return buf + sizeof(zmq::atomic_counter_t); +} + zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) : - decoder_base_t (bufsize_), + shared_message_memory_allocator( bufsize_), + decoder_base_t (this), msg_flags (0), maxmsgsize (maxmsgsize_) { @@ -59,7 +147,7 @@ zmq::v2_decoder_t::~v2_decoder_t () errno_assert (rc == 0); } -int zmq::v2_decoder_t::flags_ready () +int zmq::v2_decoder_t::flags_ready (unsigned char const*) { msg_flags = 0; if (tmpbuf [0] & v2_protocol_t::more_flag) @@ -77,40 +165,20 @@ int zmq::v2_decoder_t::flags_ready () return 0; } -int zmq::v2_decoder_t::one_byte_size_ready () +int zmq::v2_decoder_t::one_byte_size_ready (unsigned char const* read_from) { - // Message size must not exceed the maximum allowed size. - if (maxmsgsize >= 0) - if (unlikely (tmpbuf [0] > static_cast (maxmsgsize))) { - errno = EMSGSIZE; - return -1; - } - - // in_progress is initialised at this point so in theory we should - // close it before calling zmq_msg_init_size, however, it's a 0-byte - // message and thus we can treat it as uninitialised... - int rc = in_progress.init_size (tmpbuf [0]); - if (unlikely (rc)) { - errno_assert (errno == ENOMEM); - rc = in_progress.init (); - errno_assert (rc == 0); - errno = ENOMEM; - return -1; - } - - in_progress.set_flags (msg_flags); - next_step (in_progress.data (), in_progress.size (), - &v2_decoder_t::message_ready); - - return 0; + return size_ready(tmpbuf[0], read_from); } -int zmq::v2_decoder_t::eight_byte_size_ready () -{ +int zmq::v2_decoder_t::eight_byte_size_ready (unsigned char const* read_from) { // The payload size is encoded as 64-bit unsigned integer. // The most significant byte comes first. - const uint64_t msg_size = get_uint64 (tmpbuf); + const uint64_t msg_size = get_uint64(tmpbuf); + return size_ready(msg_size, read_from); +} + +int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_pos) { // Message size must not exceed the maximum allowed size. if (maxmsgsize >= 0) if (unlikely (msg_size > static_cast (maxmsgsize))) { @@ -127,7 +195,31 @@ int zmq::v2_decoder_t::eight_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised. - int rc = in_progress.init_size (static_cast (msg_size)); + int rc = -1; + + // the current message can exceed the current buffer. We have to copy the buffer + // data into a new message and complete it in the next receive. + if (unlikely ((unsigned char*)read_pos + msg_size > (data() + size()))) + { + // a new message has started, but the size would exceed the pre-allocated arena + // this happens everytime when a message does not fit completely into the buffer + rc = in_progress.init_size (static_cast (msg_size)); + } + else + { + // construct message using n bytes from the buffer as storage + // increase buffer ref count + rc = in_progress.init( (unsigned char*)read_pos, + msg_size, shared_message_memory_allocator::call_dec_ref, + buffer() ); + + // For small messages, data has been copied and refcount does not have to be increased + if (in_progress.is_lmsg()) + { + inc_ref(); + } + } + if (unlikely (rc)) { errno_assert (errno == ENOMEM); rc = in_progress.init (); @@ -137,13 +229,19 @@ int zmq::v2_decoder_t::eight_byte_size_ready () } in_progress.set_flags (msg_flags); + // this sets read_pos to + // the message data address if the data needs to be copied + // for small message / messages exceeding the current buffer + // or + // to the current start address in the buffer because the message + // was constructed to use n bytes from the address passed as argument next_step (in_progress.data (), in_progress.size (), &v2_decoder_t::message_ready); return 0; } -int zmq::v2_decoder_t::message_ready () +int zmq::v2_decoder_t::message_ready (unsigned char const*) { // Message is completely read. Signal this to the caller // and prepare to decode next message. diff --git a/src/v2_decoder.hpp b/src/v2_decoder.hpp index 14c48e4e..642b4b78 100644 --- a/src/v2_decoder.hpp +++ b/src/v2_decoder.hpp @@ -34,11 +34,66 @@ namespace zmq { - // Decoder for ZMTP/2.x framing protocol. Converts data stream into messages. - class v2_decoder_t : public decoder_base_t + // This allocater allocates a reference counted buffer which is used by v2_decoder_t + // to use zero-copy msg::init_data to create messages with memory from this buffer as + // data storage. + // + // The buffer is allocated with a reference count of 1 to make sure that is is alive while + // decoding messages. Otherwise, it is possible that e.g. the first message increases the count + // from zero to one, gets passed to the user application, processed in the user thread and deleted + // which would then deallocate the buffer. The drawback is that the buffer may be allocated longer + // than necessary because it is only deleted when allocate is called the next time. + class shared_message_memory_allocator { public: + shared_message_memory_allocator(size_t bufsize_); + ~shared_message_memory_allocator(); + + // Allocate a new buffer + // + // This releases the current buffer to be bound to the lifetime of the messages + // created on this bufer. + unsigned char* allocate(); + + // force deallocation of buffer. + void deallocate(); + + // Give up ownership of the buffer. The buffer's lifetime is now coupled to + // the messages constructed on top of it. + unsigned char* release(); + + void reset(unsigned char* b); + + void inc_ref(); + + static void call_dec_ref(void*, void* buffer); + + size_t size() const; + + // Return pointer to the first message data byte. + unsigned char* data(); + + // Return pointer to the first byte of the buffer. + unsigned char* buffer() + { + return buf; + } + + private: + unsigned char* buf; + size_t bufsize; + }; + + // Decoder for ZMTP/2.x framing protocol. Converts data stream into messages. + // The class has to inherit from shared_message_memory_allocator because + // the base class calls allocate in its constructor. + class v2_decoder_t : + // inherit first from allocator to ensure that it is constructed before decoder_base_t + public shared_message_memory_allocator, + public decoder_base_t + { + public: v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_); virtual ~v2_decoder_t (); @@ -47,10 +102,12 @@ namespace zmq private: - int flags_ready (); - int one_byte_size_ready (); - int eight_byte_size_ready (); - int message_ready (); + int flags_ready (unsigned char const*); + int one_byte_size_ready (unsigned char const*); + int eight_byte_size_ready (unsigned char const*); + int message_ready (unsigned char const*); + + int size_ready(uint64_t size_, unsigned char const*); unsigned char tmpbuf [8]; unsigned char msg_flags;