From 7aec4be368558d7f920dd864fc420477e97fbe9e Mon Sep 17 00:00:00 2001 From: Jens Auer Date: Tue, 11 Oct 2016 12:24:55 +0000 Subject: [PATCH] Add mutex to socket monitor access The monitor socket is used concurrently from different threads and needs protection. --- src/socket_base.cpp | 66 ++++++++++++++++++++++++++------------------- src/socket_base.hpp | 11 +++++--- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 8ef8176a..60e655ac 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -150,7 +150,10 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : rcvmore (false), file_desc(-1), monitor_socket (NULL), - monitor_events (0) + monitor_events (0), + last_endpoint(), + sync(), + monitor_sync() { options.socket_id = sid_; options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); @@ -158,7 +161,10 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : zmq::socket_base_t::~socket_base_t () { - stop_monitor (); + { + scoped_lock_t lock(monitor_sync); + stop_monitor (); + } zmq_assert (destroyed); } @@ -1069,7 +1075,9 @@ void zmq::socket_base_t::process_stop () // We'll remember the fact so that any blocking call is interrupted and any // further attempt to use the socket will return ETERM. The user is still // responsible for calling zmq_close on the socket though! + scoped_lock_t lock(monitor_sync); stop_monitor (); + ctx_terminated = true; } @@ -1235,10 +1243,13 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) int zmq::socket_base_t::monitor (const char *addr_, int events_) { + scoped_lock_t lock(monitor_sync); + if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } + // Support deregistering monitoring endpoints as well if (addr_ == NULL) { stop_monitor (); @@ -1284,64 +1295,62 @@ zmq::fd_t zmq::socket_base_t::fd() return file_desc; } -void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_) + +void zmq::socket_base_t::event_connected (const std::string &addr_, zmq::fd_t fd_) { - if (monitor_events & ZMQ_EVENT_CONNECTED) - monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_); + event(addr_, fd_, ZMQ_EVENT_CONNECTED); } void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_) { - if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) - monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_); + event(addr_, err_, ZMQ_EVENT_CONNECT_DELAYED); } void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_) { - if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) - monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_); + event(addr_, interval_, ZMQ_EVENT_CONNECT_RETRIED); } -void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_) +void zmq::socket_base_t::event_listening (const std::string &addr_, zmq::fd_t fd_) { - if (monitor_events & ZMQ_EVENT_LISTENING) - monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_); + event(addr_, fd_, ZMQ_EVENT_LISTENING); } void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_) { - if (monitor_events & ZMQ_EVENT_BIND_FAILED) - monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_); + event(addr_, err_, ZMQ_EVENT_BIND_FAILED); } -void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_) +void zmq::socket_base_t::event_accepted (const std::string &addr_, zmq::fd_t fd_) { - if (monitor_events & ZMQ_EVENT_ACCEPTED) - monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_); + event(addr_, fd_, ZMQ_EVENT_ACCEPTED); } void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_) { - if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) - monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_); + event(addr_, err_, ZMQ_EVENT_ACCEPT_FAILED); } -void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_) +void zmq::socket_base_t::event_closed (const std::string &addr_, zmq::fd_t fd_) { - if (monitor_events & ZMQ_EVENT_CLOSED) - monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_); + event(addr_, fd_, ZMQ_EVENT_CLOSED); } void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_) { - if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) - monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_); + event(addr_, err_, ZMQ_EVENT_CLOSE_FAILED); } -void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_) +void zmq::socket_base_t::event_disconnected (const std::string &addr_, zmq::fd_t fd_) { - if (monitor_events & ZMQ_EVENT_DISCONNECTED) - monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_); + event(addr_, fd_, ZMQ_EVENT_DISCONNECTED); +} + +void zmq::socket_base_t::event(const std::string &addr_, intptr_t fd_, int type_) +{ + scoped_lock_t lock(monitor_sync); + if (monitor_events & type_) + monitor_event (type_, fd_, addr_); } // Send a monitor event @@ -1365,6 +1374,9 @@ void zmq::socket_base_t::monitor_event (int event_, int value_, const std::strin void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_) { + // this is a private method which is only called from + // contexts where the mutex has been locked before + if (monitor_socket) { if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_) monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, ""); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index d5db9ea0..5a0d9540 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -169,16 +169,18 @@ namespace zmq // Delay actual destruction of the socket. void process_destroy (); + // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types + std::string connect_rid; + + private: + void event(const std::string &addr_, intptr_t fd_, int type_); + // Socket event data dispath void monitor_event (int event_, int value_, const std::string& addr_); // Monitor socket cleanup void stop_monitor (bool send_monitor_stopped_event_ = true); - // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types - std::string connect_rid; - - private: // Creates new endpoint ID and adds the endpoint to the map. void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe); @@ -270,6 +272,7 @@ namespace zmq socket_base_t (const socket_base_t&); const socket_base_t &operator = (const socket_base_t&); mutex_t sync; + mutex_t monitor_sync; }; }