From a5f7300da6aa2224638fc932fe5ca3624189b1c1 Mon Sep 17 00:00:00 2001 From: Ian Barber Date: Sun, 10 Jun 2012 19:57:02 +0100 Subject: [PATCH] As Martin pointed out, there is a race condition in the old code where a pipe could start shutting down after disconnection, but the new one could connect first. This connection would not get a pipe created for it, so the messages could never flow. The simplest way round this would be a flag, but it is possibly for a very bouncy but fast connection to go up and down twice I imagine, so instead I have added a counter. This starts at zero, and will null out the pipe if terminate is called while it is zero. On a disconnect situation the counter is incremented, and the pipe is the not nulled if the value is non zero. In the terminated function it is decremented for each pipe that is shut down, and the assertion that the terminated pipe == the current pipe is skipped while it is non-zero. This should deal with the race condition and not allow any extra terminated() calls without hitting the assertion. --- src/session_base.cpp | 15 +++++++++++---- src/session_base.hpp | 3 +++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/session_base.cpp b/src/session_base.cpp index 172aaaa4..6f7deace 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -111,6 +111,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, io_object_t (io_thread_), connect (connect_), pipe (NULL), + incomplete_detach (0), incomplete_in (false), pending (false), engine (NULL), @@ -229,9 +230,14 @@ void zmq::session_base_t::clean_pipes () void zmq::session_base_t::terminated (pipe_t *pipe_) { - // Drop the reference to the deallocated pipe. - zmq_assert (pipe == pipe_); - pipe = NULL; + // Drop the reference to the deallocated pipe if required. + zmq_assert (pipe == pipe_ || incomplete_detach > 0); + + if (incomplete_detach > 0) + incomplete_detach --; + + if ( incomplete_detach == 0 ) + pipe = NULL; // If we are waiting for pending messages to be sent, at this point // we are sure that there will be no more messages and we can proceed @@ -291,7 +297,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_) zmq_assert (engine_ != NULL); // Create the pipe if it does not exist yet. - if (!pipe && !is_terminating ()) { + if ((!pipe || incomplete_detach > 0) && !is_terminating ()) { object_t *parents [2] = {this, socket}; pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; @@ -401,6 +407,7 @@ void zmq::session_base_t::detached () && addr->protocol != "pgm" && addr->protocol != "epgm") { pipe->hiccup (); pipe->terminate (false); + incomplete_detach ++; } reset (); diff --git a/src/session_base.hpp b/src/session_base.hpp index 8244cb59..73b5cf41 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -103,6 +103,9 @@ namespace zmq // Pipe connecting the session to its socket. zmq::pipe_t *pipe; + + // This flag is set if we are disconnecting, but haven't yet completed + int incomplete_detach; // This flag is true if the remainder of the message being processed // is still in the in pipe.