From 74ae19ac1f31d1264e425f8fd9caa4d277d65c49 Mon Sep 17 00:00:00 2001 From: Sergey KHripchenko Date: Sat, 21 Apr 2012 18:36:20 +0400 Subject: [PATCH 1/3] spaces deleted --- src/pipe.cpp | 2 +- src/session_base.cpp | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/pipe.cpp b/src/pipe.cpp index 2a7e6ef4..b7c04dc4 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -359,7 +359,7 @@ void zmq::pipe_t::terminate (bool delay_) // active state. else if (state == delimited) { send_pipe_term (peer); - state = terminated; + state = terminated; } // There are no other states. diff --git a/src/session_base.cpp b/src/session_base.cpp index b99e3786..6dc6799d 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -357,6 +357,7 @@ void zmq::session_base_t::proceed_with_term () void zmq::session_base_t::timer_event (int id_) { + // Linger period expired. We can proceed with termination even though // there are still pending messages to be sent. zmq_assert (id_ == linger_timer_id); @@ -376,13 +377,13 @@ void zmq::session_base_t::detached () } // Reconnect. - if (options.reconnect_ivl != -1) - start_connecting (true); + if (options.reconnect_ivl != -1) + start_connecting (true); // For subscriber sockets we hiccup the inbound pipe, which will cause // the socket object to resend all the subscriptions. if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) - pipe->hiccup (); + pipe->hiccup (); } void zmq::session_base_t::start_connecting (bool wait_) From 4f668ad60a7fa6c6000a5f7d94731a42ea295513 Mon Sep 17 00:00:00 2001 From: Sergey KHripchenko Date: Sat, 21 Apr 2012 18:39:19 +0400 Subject: [PATCH 2/3] added zmq_unbind() / zmq_disconnect() test script. it works but rises very serious questions. Please add license header by your choice. This file for 99% resemble crossroads-io/tests/shutdown.cpp --- tests/Makefile.am | 4 +- tests/test_term_endpoint.cpp | 97 ++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 tests/test_term_endpoint.cpp diff --git a/tests/Makefile.am b/tests/Makefile.am index 6c270163..81bedeee 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -13,7 +13,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_invalid_rep \ test_msg_flags \ test_connect_resolve \ - test_last_endpoint + test_last_endpoint \ + test_term_endpoint if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -33,6 +34,7 @@ test_invalid_rep_SOURCES = test_invalid_rep.cpp test_msg_flags_SOURCES = test_msg_flags.cpp test_connect_resolve_SOURCES = test_connect_resolve.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp +test_term_endpoint_SOURCES = test_term_endpoint.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_term_endpoint.cpp b/tests/test_term_endpoint.cpp new file mode 100644 index 00000000..6e501d73 --- /dev/null +++ b/tests/test_term_endpoint.cpp @@ -0,0 +1,97 @@ +#include +#include +#include + +#include "../include/zmq.h" +#include "../include/zmq_utils.h" + + +int main (int argc, char *argv []) +{ + int rc; + char buf[32]; + const char *ep = "tcp://127.0.0.1:5560"; + + fprintf (stderr, "unbind endpoint test running...\n"); + + // Create infrastructure. + void *ctx = zmq_init (1); + assert (ctx); + void *push = zmq_socket (ctx, ZMQ_PUSH); + assert (push); + rc = zmq_bind (push, ep); + assert (rc == 0); + void *pull = zmq_socket (ctx, ZMQ_PULL); + assert (pull); + rc = zmq_connect (pull, ep); + assert (rc == 0); + + // Pass one message through to ensure the connection is established. + rc = zmq_send (push, "ABC", 3, 0); + assert (rc == 3); + rc = zmq_recv (pull, buf, sizeof (buf), 0); + assert (rc == 3); + + // Unbind the lisnening endpoint + rc = zmq_unbind (push, ep); + assert (rc == 0); + + // Let events some time + zmq_sleep (1); + + // Check that sending would block (there's no outbound connection). + rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); + assert (rc == -1 && zmq_errno () == EAGAIN); + + // Clean up. + rc = zmq_close (pull); + assert (rc == 0); + rc = zmq_close (push); + assert (rc == 0); + rc = zmq_term (ctx); + assert (rc == 0); + + + // Now the other way round. + fprintf (stderr, "disconnect endpoint test running...\n"); + + + // Create infrastructure. + ctx = zmq_init (1); + assert (ctx); + push = zmq_socket (ctx, ZMQ_PUSH); + assert (push); + rc = zmq_connect (push, ep); + assert (rc == 0); + pull = zmq_socket (ctx, ZMQ_PULL); + assert (pull); + rc = zmq_bind (pull, ep); + assert (rc == 0); + + // Pass one message through to ensure the connection is established. + rc = zmq_send (push, "ABC", 3, 0); + assert (rc == 3); + rc = zmq_recv (pull, buf, sizeof (buf), 0); + assert (rc == 3); + + // Disconnect the bound endpoint + rc = zmq_disconnect (push, ep); + assert (rc == 0); + + // Let events some time + zmq_sleep (1); + + // Check that sending would block (there's no inbound connections). + rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); + assert (rc == -1 && zmq_errno () == EAGAIN); + + // Clean up. + rc = zmq_close (pull); + assert (rc == 0); + rc = zmq_close (push); + assert (rc == 0); + rc = zmq_term (ctx); + assert (rc == 0); + + return 0; +} From 057fab09a80728db0445313e0ed0123be128e046 Mon Sep 17 00:00:00 2001 From: Sergey KHripchenko Date: Sat, 21 Apr 2012 18:56:10 +0400 Subject: [PATCH 3/3] fixes for zmq_unbind() / zmq_disconnect() usage corner cases 1. when we call zmq_bind()/zmq_connect() to create endpoint we send ourselfs(through launch_child()) command to process_own(endpoint) (and add it to own_t::owned) in the application thread we could call zmq_unbind() / zmq_disconnect() _BEFORE_ we run process_own() in ZMQ thread and in this situation we will be unable to find it in own_t::owned. in other words own_t::owned.find(endpoint) will not be deleted but it will be deleted from socket_base_t::endpoints. 2. when you zmq_unbind() the lisnening TCP/IPC socket was terminated only in destructor... so the whole ZMQ_LINGER time listening TCP/IPC socket was able to accept() new connections but unable to handle them. this all geting even worse since unfortunately zmq has a bug and '*_listener_t' object not terminated untill the socket's zmq_close(). AT LEAST FOR PUSH SOCKETS. Everything is ok for SUB sockets. Easy to reproduce without my fix: zmq_socket(PUSH) zmq_bind(tcp); // connect to it from PULL socket zmq_unbind(tcp); sleep(forever) // netstat -anp | grep 'tcp listening socket' With my fix you could see that after zmq_unbind(tcp) all previously connected tcp sessions will not be finished untill the zmq_close(socket) regardless of ZMQ_LINGER value. (*_listener_t terminates all owned session_base_t(connect=false) and they call pipe_t::terminate() which in turn should call session_base_t::terminated() but this never happens) --- src/ipc_listener.cpp | 2 +- src/socket_base.cpp | 6 ++++++ src/tcp_listener.cpp | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index d2e0dfc1..310d6f30 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -65,6 +65,7 @@ void zmq::ipc_listener_t::process_plug () void zmq::ipc_listener_t::process_term (int linger_) { rm_fd (handle); + close (); own_t::process_term (linger_); } @@ -182,4 +183,3 @@ zmq::fd_t zmq::ipc_listener_t::accept () } #endif - diff --git a/src/socket_base.cpp b/src/socket_base.cpp index d25cec35..b00b42bf 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -553,6 +553,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) return -1; } + // Process pending commands, if any, since there could be pending unprocessed process_own()'s + // (from launch_child() for example) we're asked to terminate now. + int rc = process_commands (0, false); + if (unlikely (rc != 0)) + return -1; + // Find the endpoints range (if any) corresponding to the addr_ string. std::pair range = endpoints.equal_range (std::string (addr_)); if (range.first == range.second) diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index aef2f46d..67df66d7 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -73,6 +73,7 @@ void zmq::tcp_listener_t::process_plug () void zmq::tcp_listener_t::process_term (int linger_) { rm_fd (handle); + close (); own_t::process_term (linger_); }