diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 122d1109..99e882bd 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -117,8 +117,15 @@ void zmq::pgm_receiver_t::activate_in () // processed the whole buffer but failed to write // the last message into the pipe. if (pending_bytes == 0) { - if (mru_decoder != NULL) + if (mru_decoder != NULL) { mru_decoder->process_buffer (NULL, 0); + session->flush (); + } + + // Resume polling. + set_pollin (pipe_handle); + set_pollin (socket_handle); + return; } @@ -128,6 +135,7 @@ void zmq::pgm_receiver_t::activate_in () // Ask the decoder to process remaining data. size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes); pending_bytes -= n; + session->flush (); if (pending_bytes > 0) return; @@ -145,7 +153,8 @@ void zmq::pgm_receiver_t::in_event () unsigned char *data = NULL; const pgm_tsi_t *tsi = NULL; - zmq_assert (pending_bytes == 0); + if (pending_bytes > 0) + return; if (has_rx_timer) { cancel_timer (rx_timer_id);