mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-10 07:56:09 +00:00
problem:closed poller still associated with socket
This commit is contained in:
parent
e15da4b38c
commit
a747f72450
@ -71,6 +71,11 @@ void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler)
|
|||||||
signalers.erase(it);
|
signalers.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void zmq::mailbox_safe_t::clear_signalers ()
|
||||||
|
{
|
||||||
|
signalers.clear ();
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::mailbox_safe_t::send (const command_t &cmd_)
|
void zmq::mailbox_safe_t::send (const command_t &cmd_)
|
||||||
{
|
{
|
||||||
sync->lock ();
|
sync->lock ();
|
||||||
|
@ -59,6 +59,7 @@ namespace zmq
|
|||||||
// Add signaler to mailbox which will be called when a message is ready
|
// Add signaler to mailbox which will be called when a message is ready
|
||||||
void add_signaler (signaler_t* signaler);
|
void add_signaler (signaler_t* signaler);
|
||||||
void remove_signaler (signaler_t* signaler);
|
void remove_signaler (signaler_t* signaler);
|
||||||
|
void clear_signalers ();
|
||||||
|
|
||||||
#ifdef HAVE_FORK
|
#ifdef HAVE_FORK
|
||||||
// close the file descriptors in the signaller. This is used in a forked
|
// close the file descriptors in the signaller. This is used in a forked
|
||||||
|
@ -1281,6 +1281,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
|||||||
|
|
||||||
int zmq::socket_base_t::close ()
|
int zmq::socket_base_t::close ()
|
||||||
{
|
{
|
||||||
|
ENTER_MUTEX ();
|
||||||
|
|
||||||
|
// Remove all existing signalers for thread safe sockets
|
||||||
|
if (thread_safe)
|
||||||
|
((mailbox_safe_t*)mailbox)->clear_signalers();
|
||||||
|
|
||||||
// Mark the socket as dead
|
// Mark the socket as dead
|
||||||
tag = 0xdeadbeef;
|
tag = 0xdeadbeef;
|
||||||
|
|
||||||
@ -1289,6 +1295,7 @@ int zmq::socket_base_t::close ()
|
|||||||
// process.
|
// process.
|
||||||
send_reap (this);
|
send_reap (this);
|
||||||
|
|
||||||
|
EXIT_MUTEX ();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,19 +186,14 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
items.erase(it);
|
||||||
|
need_rebuild = true;
|
||||||
|
|
||||||
int thread_safe;
|
int thread_safe;
|
||||||
size_t thread_safe_size = sizeof(int);
|
size_t thread_safe_size = sizeof(int);
|
||||||
|
|
||||||
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
|
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
|
||||||
return -1;
|
socket_->remove_signaler (&signaler);
|
||||||
|
|
||||||
if (thread_safe) {
|
|
||||||
if (socket_->remove_signaler (&signaler) == -1)
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
items.erase (it);
|
|
||||||
need_rebuild = true;
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -143,9 +143,7 @@ int main (void)
|
|||||||
assert (event.events == ZMQ_POLLOUT);
|
assert (event.events == ZMQ_POLLOUT);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Destory poller, sockets and ctx
|
// Destory sockets, poller and ctx
|
||||||
rc = zmq_poller_destroy (&poller);
|
|
||||||
assert (rc == 0);
|
|
||||||
rc = zmq_close (sink);
|
rc = zmq_close (sink);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_close (vent);
|
rc = zmq_close (vent);
|
||||||
@ -158,6 +156,8 @@ int main (void)
|
|||||||
rc = zmq_close (client);
|
rc = zmq_close (client);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
#endif
|
#endif
|
||||||
|
rc = zmq_poller_destroy(&poller);
|
||||||
|
assert(rc == 0);
|
||||||
rc = zmq_ctx_term (ctx);
|
rc = zmq_ctx_term (ctx);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user