From a747f72450f6e025effcc879fbafbe79c16ffecd Mon Sep 17 00:00:00 2001 From: Doron Somech Date: Thu, 5 May 2016 12:53:55 +0300 Subject: [PATCH] problem:closed poller still associated with socket --- src/mailbox_safe.cpp | 9 +++++++-- src/mailbox_safe.hpp | 5 +++-- src/socket_base.cpp | 7 +++++++ src/socket_poller.cpp | 15 +++++---------- tests/test_poller.cpp | 6 +++--- 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/mailbox_safe.cpp b/src/mailbox_safe.cpp index 4c593109..a22d7b84 100644 --- a/src/mailbox_safe.cpp +++ b/src/mailbox_safe.cpp @@ -52,12 +52,12 @@ zmq::mailbox_safe_t::~mailbox_safe_t () sync->unlock (); } -void zmq::mailbox_safe_t::add_signaler(signaler_t* signaler) +void zmq::mailbox_safe_t::add_signaler (signaler_t* signaler) { signalers.push_back(signaler); } -void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler) +void zmq::mailbox_safe_t::remove_signaler (signaler_t* signaler) { std::vector::iterator it = signalers.begin(); @@ -71,6 +71,11 @@ void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler) signalers.erase(it); } +void zmq::mailbox_safe_t::clear_signalers () +{ + signalers.clear (); +} + void zmq::mailbox_safe_t::send (const command_t &cmd_) { sync->lock (); diff --git a/src/mailbox_safe.hpp b/src/mailbox_safe.hpp index a909e90f..5fc05730 100644 --- a/src/mailbox_safe.hpp +++ b/src/mailbox_safe.hpp @@ -57,8 +57,9 @@ namespace zmq int recv (command_t *cmd_, int timeout_); // Add signaler to mailbox which will be called when a message is ready - void add_signaler(signaler_t* signaler); - void remove_signaler(signaler_t* signaler); + void add_signaler (signaler_t* signaler); + void remove_signaler (signaler_t* signaler); + void clear_signalers (); #ifdef HAVE_FORK // close the file descriptors in the signaller. This is used in a forked diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 5c3dbefd..4253256e 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1281,6 +1281,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) int zmq::socket_base_t::close () { + ENTER_MUTEX (); + + // Remove all existing signalers for thread safe sockets + if (thread_safe) + ((mailbox_safe_t*)mailbox)->clear_signalers(); + // Mark the socket as dead tag = 0xdeadbeef; @@ -1289,6 +1295,7 @@ int zmq::socket_base_t::close () // process. send_reap (this); + EXIT_MUTEX (); return 0; } diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index fa5a7f99..d694df3a 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -186,19 +186,14 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_) return -1; } + items.erase(it); + need_rebuild = true; + int thread_safe; size_t thread_safe_size = sizeof(int); - if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) - return -1; - - if (thread_safe) { - if (socket_->remove_signaler (&signaler) == -1) - return -1; - } - - items.erase (it); - need_rebuild = true; + if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe) + socket_->remove_signaler (&signaler); return 0; } diff --git a/tests/test_poller.cpp b/tests/test_poller.cpp index 52c20581..654bdb09 100644 --- a/tests/test_poller.cpp +++ b/tests/test_poller.cpp @@ -143,9 +143,7 @@ int main (void) assert (event.events == ZMQ_POLLOUT); #endif - // Destory poller, sockets and ctx - rc = zmq_poller_destroy (&poller); - assert (rc == 0); + // Destory sockets, poller and ctx rc = zmq_close (sink); assert (rc == 0); rc = zmq_close (vent); @@ -158,6 +156,8 @@ int main (void) rc = zmq_close (client); assert (rc == 0); #endif + rc = zmq_poller_destroy(&poller); + assert(rc == 0); rc = zmq_ctx_term (ctx); assert (rc == 0);