diff --git a/src/encoder.hpp b/src/encoder.hpp index 4c299f1b..e30f7d9c 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -126,6 +126,11 @@ namespace zmq *size_ = pos; } + inline bool has_data () + { + return to_write > 0; + } + protected: // Prototype of state machine action. diff --git a/src/i_encoder.hpp b/src/i_encoder.hpp index a2551cb6..ae491e25 100644 --- a/src/i_encoder.hpp +++ b/src/i_encoder.hpp @@ -47,6 +47,7 @@ namespace zmq virtual void get_data (unsigned char **data_, size_t *size_, int *offset_ = NULL) = 0; + virtual bool has_data () = 0; }; } diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index a160ced0..8121469b 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -63,6 +63,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons options (options_), endpoint (endpoint_), plugged (false), + terminating (false), socket (NULL) { // Put the socket into non-blocking mode. @@ -172,6 +173,11 @@ void zmq::stream_engine_t::unplug () void zmq::stream_engine_t::terminate () { + if (!terminating && encoder && encoder->has_data ()) { + // Give io_thread a chance to send in the buffer + terminating = true; + return; + } unplug (); delete this; } @@ -273,6 +279,8 @@ void zmq::stream_engine_t::out_event () // this is necessary to prevent losing incomming messages. if (nbytes == -1) { reset_pollout (handle); + if (unlikely (terminating)) + terminate (); return; } @@ -284,6 +292,10 @@ void zmq::stream_engine_t::out_event () if (unlikely (handshaking)) if (outsize == 0) reset_pollout (handle); + + if (unlikely (terminating)) + if (outsize == 0) + terminate (); } void zmq::stream_engine_t::activate_out () diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 2d83995c..e74dc2b9 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -135,6 +135,7 @@ namespace zmq std::string endpoint; bool plugged; + bool terminating; // Socket zmq::socket_base_t *socket;