diff --git a/src/decoder.hpp b/src/decoder.hpp index b6dc9e90..4d00d45d 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -73,6 +73,10 @@ namespace zmq return bufsize; } + void resize(size_t new_size) + { + bufsize = new_size; + } private: size_t bufsize; unsigned char* buf; @@ -190,6 +194,11 @@ namespace zmq return 0; } + virtual void resize_buffer(size_t new_size) + { + allocator->resize(new_size); + } + protected: // Prototype of state machine action. Action should return false if diff --git a/src/i_decoder.hpp b/src/i_decoder.hpp index b003cf81..038d406e 100644 --- a/src/i_decoder.hpp +++ b/src/i_decoder.hpp @@ -46,6 +46,7 @@ namespace zmq virtual void get_buffer (unsigned char **data_, size_t *size_) = 0; + virtual void resize_buffer(size_t) = 0; // Decodes data pointed to by data_. // When a message is decoded, 1 is returned. // When the decoder needs more data, 0 is returnd. @@ -54,6 +55,8 @@ namespace zmq size_t &processed) = 0; virtual msg_t *msg () = 0; + + }; } diff --git a/src/raw_decoder.hpp b/src/raw_decoder.hpp index fc88add1..31310174 100644 --- a/src/raw_decoder.hpp +++ b/src/raw_decoder.hpp @@ -56,7 +56,7 @@ namespace zmq virtual msg_t *msg () { return &in_progress; } - + virtual void resize_buffer(size_t) {} private: diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 7d750b73..8c8eb8a4 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -295,6 +295,7 @@ void zmq::stream_engine_t::in_event () decoder->get_buffer (&inpos, &bufsize); const int rc = tcp_read (s, inpos, bufsize); + if (rc == 0) { error (connection_error); return; @@ -307,6 +308,8 @@ void zmq::stream_engine_t::in_event () // Adjust input size insize = static_cast (rc); + // Adjust buffer size to received bytes + decoder->resize_buffer(insize); } int rc = 0; diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index 9eb8d12f..cb22d91c 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -43,7 +43,8 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_): buf(NULL), - bufsize( bufsize_ ) + bufsize( 0 ), + maxsize( bufsize_ ) { } @@ -71,6 +72,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate() new(buf) atomic_counter_t(1); } + bufsize = maxsize; return buf + sizeof( zmq::atomic_counter_t); } @@ -78,12 +80,15 @@ void zmq::shared_message_memory_allocator::deallocate() { free(buf); buf = NULL; + bufsize = 0; } unsigned char* zmq::shared_message_memory_allocator::release() { unsigned char* b = buf; buf = NULL; + bufsize = 0; + return b; } diff --git a/src/v2_decoder.hpp b/src/v2_decoder.hpp index 642b4b78..fff7a188 100644 --- a/src/v2_decoder.hpp +++ b/src/v2_decoder.hpp @@ -80,9 +80,16 @@ namespace zmq return buf; } + void resize(size_t new_size) + { + bufsize = new_size; + } + private: unsigned char* buf; size_t bufsize; + size_t maxsize; + zmq::atomic_counter_t* msg_refcnt; }; // Decoder for ZMTP/2.x framing protocol. Converts data stream into messages.