From dfe19080089450bf4973a59b387be891fe0ca9ef Mon Sep 17 00:00:00 2001 From: Jens Auer Date: Wed, 10 Jun 2015 22:09:55 +0200 Subject: [PATCH] Fixed wrong buffer end detection in v2_decoder. zero-copy msg_t::init cannot be used when the message exceeds either the buffer end or the last received byte. To detect this, the buffer is now resized to the numnber of received bytes. --- src/decoder.hpp | 9 +++++++++ src/i_decoder.hpp | 3 +++ src/raw_decoder.hpp | 2 +- src/stream_engine.cpp | 3 +++ src/v2_decoder.cpp | 7 ++++++- src/v2_decoder.hpp | 7 +++++++ 6 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/decoder.hpp b/src/decoder.hpp index b6dc9e90..4d00d45d 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -73,6 +73,10 @@ namespace zmq return bufsize; } + void resize(size_t new_size) + { + bufsize = new_size; + } private: size_t bufsize; unsigned char* buf; @@ -190,6 +194,11 @@ namespace zmq return 0; } + virtual void resize_buffer(size_t new_size) + { + allocator->resize(new_size); + } + protected: // Prototype of state machine action. Action should return false if diff --git a/src/i_decoder.hpp b/src/i_decoder.hpp index b003cf81..038d406e 100644 --- a/src/i_decoder.hpp +++ b/src/i_decoder.hpp @@ -46,6 +46,7 @@ namespace zmq virtual void get_buffer (unsigned char **data_, size_t *size_) = 0; + virtual void resize_buffer(size_t) = 0; // Decodes data pointed to by data_. // When a message is decoded, 1 is returned. // When the decoder needs more data, 0 is returnd. @@ -54,6 +55,8 @@ namespace zmq size_t &processed) = 0; virtual msg_t *msg () = 0; + + }; } diff --git a/src/raw_decoder.hpp b/src/raw_decoder.hpp index fc88add1..31310174 100644 --- a/src/raw_decoder.hpp +++ b/src/raw_decoder.hpp @@ -56,7 +56,7 @@ namespace zmq virtual msg_t *msg () { return &in_progress; } - + virtual void resize_buffer(size_t) {} private: diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 7d750b73..8c8eb8a4 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -295,6 +295,7 @@ void zmq::stream_engine_t::in_event () decoder->get_buffer (&inpos, &bufsize); const int rc = tcp_read (s, inpos, bufsize); + if (rc == 0) { error (connection_error); return; @@ -307,6 +308,8 @@ void zmq::stream_engine_t::in_event () // Adjust input size insize = static_cast (rc); + // Adjust buffer size to received bytes + decoder->resize_buffer(insize); } int rc = 0; diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index 9eb8d12f..cb22d91c 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -43,7 +43,8 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_): buf(NULL), - bufsize( bufsize_ ) + bufsize( 0 ), + maxsize( bufsize_ ) { } @@ -71,6 +72,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate() new(buf) atomic_counter_t(1); } + bufsize = maxsize; return buf + sizeof( zmq::atomic_counter_t); } @@ -78,12 +80,15 @@ void zmq::shared_message_memory_allocator::deallocate() { free(buf); buf = NULL; + bufsize = 0; } unsigned char* zmq::shared_message_memory_allocator::release() { unsigned char* b = buf; buf = NULL; + bufsize = 0; + return b; } diff --git a/src/v2_decoder.hpp b/src/v2_decoder.hpp index 642b4b78..fff7a188 100644 --- a/src/v2_decoder.hpp +++ b/src/v2_decoder.hpp @@ -80,9 +80,16 @@ namespace zmq return buf; } + void resize(size_t new_size) + { + bufsize = new_size; + } + private: unsigned char* buf; size_t bufsize; + size_t maxsize; + zmq::atomic_counter_t* msg_refcnt; }; // Decoder for ZMTP/2.x framing protocol. Converts data stream into messages.