diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 0df3ac0a..67fceb3d 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -660,6 +660,12 @@ void zmq::stream_engine_t::mechanism_ready () msg_t identity; mechanism->peer_identity (&identity); const int rc = session->push_msg (&identity); + if (rc == -1 && errno == EAGAIN) { + // If the write is failing at this stage with + // an EAGAIN the pipe must be being shut down, + // so we can just bail out of the identity set. + return; + } errno_assert (rc == 0); } diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 6554b1b4..86e82c5f 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -41,12 +41,14 @@ static bool read_msg(void* s, zmq_event_t& event, std::string& ep) zmq_msg_init (&msg2); rc = zmq_msg_recv (&msg1, s, 0); if (rc == -1 && zmq_errno() == ETERM) - return true ; + return true ; + assert (rc != -1); assert (zmq_msg_more(&msg1) != 0); rc = zmq_msg_recv (&msg2, s, 0); if (rc == -1 && zmq_errno() == ETERM) - return true; + return true; + assert (rc != -1); assert (zmq_msg_more(&msg2) == 0); // copy binary data to event struct @@ -72,7 +74,7 @@ static void *req_socket_monitor (void *ctx) rc = zmq_connect (s, "inproc://monitor.req"); assert (rc == 0); while (!read_msg(s, event, ep)) { - assert (ep == addr); + assert (ep == addr); switch (event.event) { case ZMQ_EVENT_CONNECTED: assert (event.value > 0); @@ -114,7 +116,7 @@ static void *req2_socket_monitor (void *ctx) rc = zmq_connect (s, "inproc://monitor.req2"); assert (rc == 0); while (!read_msg(s, event, ep)) { - assert (ep == addr); + assert (ep == addr); switch (event.event) { case ZMQ_EVENT_CONNECTED: assert (event.value > 0); @@ -143,7 +145,7 @@ static void *rep_socket_monitor (void *ctx) rc = zmq_connect (s, "inproc://monitor.rep"); assert (rc == 0); while (!read_msg(s, event, ep)) { - assert (ep == addr); + assert (ep == addr); switch (event.event) { case ZMQ_EVENT_LISTENING: assert (event.value > 0); @@ -203,10 +205,7 @@ int main (void) assert (rc == 0); rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx); assert (rc == 0); - - rc = zmq_bind (rep, addr.c_str()); - assert (rc == 0); - + // REQ socket req = zmq_socket (ctx, ZMQ_REQ); assert (req); @@ -216,6 +215,11 @@ int main (void) assert (rc == 0); rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx); assert (rc == 0); + sleep(1); + + // Bind REQ and REP + rc = zmq_bind (rep, addr.c_str()); + assert (rc == 0); rc = zmq_connect (req, addr.c_str()); assert (rc == 0); @@ -271,4 +275,3 @@ int main (void) return 0 ; } -