mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-23 21:37:46 +00:00
Merge pull request #149 from jens-auer/2158
Add mutex to socket monitor access
This commit is contained in:
commit
107266b77d
@ -150,7 +150,10 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
|
|||||||
rcvmore (false),
|
rcvmore (false),
|
||||||
file_desc(-1),
|
file_desc(-1),
|
||||||
monitor_socket (NULL),
|
monitor_socket (NULL),
|
||||||
monitor_events (0)
|
monitor_events (0),
|
||||||
|
last_endpoint(),
|
||||||
|
sync(),
|
||||||
|
monitor_sync()
|
||||||
{
|
{
|
||||||
options.socket_id = sid_;
|
options.socket_id = sid_;
|
||||||
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
|
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
|
||||||
@ -158,7 +161,10 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
|
|||||||
|
|
||||||
zmq::socket_base_t::~socket_base_t ()
|
zmq::socket_base_t::~socket_base_t ()
|
||||||
{
|
{
|
||||||
stop_monitor ();
|
{
|
||||||
|
scoped_lock_t lock(monitor_sync);
|
||||||
|
stop_monitor ();
|
||||||
|
}
|
||||||
zmq_assert (destroyed);
|
zmq_assert (destroyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1069,7 +1075,9 @@ void zmq::socket_base_t::process_stop ()
|
|||||||
// We'll remember the fact so that any blocking call is interrupted and any
|
// We'll remember the fact so that any blocking call is interrupted and any
|
||||||
// further attempt to use the socket will return ETERM. The user is still
|
// further attempt to use the socket will return ETERM. The user is still
|
||||||
// responsible for calling zmq_close on the socket though!
|
// responsible for calling zmq_close on the socket though!
|
||||||
|
scoped_lock_t lock(monitor_sync);
|
||||||
stop_monitor ();
|
stop_monitor ();
|
||||||
|
|
||||||
ctx_terminated = true;
|
ctx_terminated = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1235,10 +1243,13 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
|
|||||||
|
|
||||||
int zmq::socket_base_t::monitor (const char *addr_, int events_)
|
int zmq::socket_base_t::monitor (const char *addr_, int events_)
|
||||||
{
|
{
|
||||||
|
scoped_lock_t lock(monitor_sync);
|
||||||
|
|
||||||
if (unlikely (ctx_terminated)) {
|
if (unlikely (ctx_terminated)) {
|
||||||
errno = ETERM;
|
errno = ETERM;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Support deregistering monitoring endpoints as well
|
// Support deregistering monitoring endpoints as well
|
||||||
if (addr_ == NULL) {
|
if (addr_ == NULL) {
|
||||||
stop_monitor ();
|
stop_monitor ();
|
||||||
@ -1284,64 +1295,62 @@ zmq::fd_t zmq::socket_base_t::fd()
|
|||||||
return file_desc;
|
return file_desc;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
|
|
||||||
|
void zmq::socket_base_t::event_connected (const std::string &addr_, zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_CONNECTED)
|
event(addr_, fd_, ZMQ_EVENT_CONNECTED);
|
||||||
monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
|
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
|
event(addr_, err_, ZMQ_EVENT_CONNECT_DELAYED);
|
||||||
monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
|
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
|
event(addr_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
|
||||||
monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
|
void zmq::socket_base_t::event_listening (const std::string &addr_, zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_LISTENING)
|
event(addr_, fd_, ZMQ_EVENT_LISTENING);
|
||||||
monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
|
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_BIND_FAILED)
|
event(addr_, err_, ZMQ_EVENT_BIND_FAILED);
|
||||||
monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
|
void zmq::socket_base_t::event_accepted (const std::string &addr_, zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_ACCEPTED)
|
event(addr_, fd_, ZMQ_EVENT_ACCEPTED);
|
||||||
monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
|
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
|
event(addr_, err_, ZMQ_EVENT_ACCEPT_FAILED);
|
||||||
monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
|
void zmq::socket_base_t::event_closed (const std::string &addr_, zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_CLOSED)
|
event(addr_, fd_, ZMQ_EVENT_CLOSED);
|
||||||
monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
|
void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
|
event(addr_, err_, ZMQ_EVENT_CLOSE_FAILED);
|
||||||
monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
|
void zmq::socket_base_t::event_disconnected (const std::string &addr_, zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
if (monitor_events & ZMQ_EVENT_DISCONNECTED)
|
event(addr_, fd_, ZMQ_EVENT_DISCONNECTED);
|
||||||
monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
|
}
|
||||||
|
|
||||||
|
void zmq::socket_base_t::event(const std::string &addr_, intptr_t fd_, int type_)
|
||||||
|
{
|
||||||
|
scoped_lock_t lock(monitor_sync);
|
||||||
|
if (monitor_events & type_)
|
||||||
|
monitor_event (type_, fd_, addr_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a monitor event
|
// Send a monitor event
|
||||||
@ -1365,6 +1374,9 @@ void zmq::socket_base_t::monitor_event (int event_, int value_, const std::strin
|
|||||||
|
|
||||||
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
|
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
|
||||||
{
|
{
|
||||||
|
// this is a private method which is only called from
|
||||||
|
// contexts where the mutex has been locked before
|
||||||
|
|
||||||
if (monitor_socket) {
|
if (monitor_socket) {
|
||||||
if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
|
if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
|
||||||
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
|
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
|
||||||
|
@ -169,16 +169,18 @@ namespace zmq
|
|||||||
// Delay actual destruction of the socket.
|
// Delay actual destruction of the socket.
|
||||||
void process_destroy ();
|
void process_destroy ();
|
||||||
|
|
||||||
|
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
||||||
|
std::string connect_rid;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void event(const std::string &addr_, intptr_t fd_, int type_);
|
||||||
|
|
||||||
// Socket event data dispath
|
// Socket event data dispath
|
||||||
void monitor_event (int event_, int value_, const std::string& addr_);
|
void monitor_event (int event_, int value_, const std::string& addr_);
|
||||||
|
|
||||||
// Monitor socket cleanup
|
// Monitor socket cleanup
|
||||||
void stop_monitor (bool send_monitor_stopped_event_ = true);
|
void stop_monitor (bool send_monitor_stopped_event_ = true);
|
||||||
|
|
||||||
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
|
||||||
std::string connect_rid;
|
|
||||||
|
|
||||||
private:
|
|
||||||
// Creates new endpoint ID and adds the endpoint to the map.
|
// Creates new endpoint ID and adds the endpoint to the map.
|
||||||
void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
|
void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
|
||||||
|
|
||||||
@ -270,6 +272,7 @@ namespace zmq
|
|||||||
socket_base_t (const socket_base_t&);
|
socket_base_t (const socket_base_t&);
|
||||||
const socket_base_t &operator = (const socket_base_t&);
|
const socket_base_t &operator = (const socket_base_t&);
|
||||||
mutex_t sync;
|
mutex_t sync;
|
||||||
|
mutex_t monitor_sync;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user