0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-17 04:50:57 +08:00

fixes for zmq_unbind() / zmq_disconnect() usage corner cases

1. when we call zmq_bind()/zmq_connect() to create endpoint
we send ourselfs(through launch_child()) command to process_own(endpoint)
(and add it to own_t::owned)
in the application thread we could call zmq_unbind() / zmq_disconnect() _BEFORE_
we run process_own() in ZMQ thread and in this situation we will be unable to find it in
own_t::owned. in other words own_t::owned.find(endpoint) will not be deleted but it will be deleted from
socket_base_t::endpoints.

2. when you zmq_unbind() the lisnening TCP/IPC socket was terminated only in destructor...
so the whole ZMQ_LINGER time listening TCP/IPC socket was able to accept() new connections
but unable to handle them.

this all geting even worse since unfortunately zmq has a bug and '*_listener_t' object not terminated
untill the socket's zmq_close().
AT LEAST FOR PUSH SOCKETS.
Everything is ok for SUB sockets.

Easy to reproduce without my fix:

zmq_socket(PUSH)
zmq_bind(tcp);
// connect to  it from PULL socket
zmq_unbind(tcp);

sleep(forever)

// netstat -anp | grep 'tcp listening socket'

With my fix you could see that after zmq_unbind(tcp) all previously connected tcp sessions
will not be finished untill the zmq_close(socket) regardless of ZMQ_LINGER value.

(*_listener_t terminates all owned session_base_t(connect=false) and they call pipe_t::terminate()
which in turn should call session_base_t::terminated() but this never happens)
This commit is contained in:
Sergey KHripchenko 2012-04-21 18:56:10 +04:00
parent 4f668ad60a
commit 057fab09a8
3 changed files with 8 additions and 1 deletions

View File

@ -65,6 +65,7 @@ void zmq::ipc_listener_t::process_plug ()
void zmq::ipc_listener_t::process_term (int linger_)
{
rm_fd (handle);
close ();
own_t::process_term (linger_);
}
@ -182,4 +183,3 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
}
#endif

View File

@ -553,6 +553,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
return -1;
}
// Process pending commands, if any, since there could be pending unprocessed process_own()'s
// (from launch_child() for example) we're asked to terminate now.
int rc = process_commands (0, false);
if (unlikely (rc != 0))
return -1;
// Find the endpoints range (if any) corresponding to the addr_ string.
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
if (range.first == range.second)

View File

@ -73,6 +73,7 @@ void zmq::tcp_listener_t::process_plug ()
void zmq::tcp_listener_t::process_term (int linger_)
{
rm_fd (handle);
close ();
own_t::process_term (linger_);
}