mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-26 23:01:04 +08:00
fix case when zmq_poller access a dead socket
This commit is contained in:
parent
714988e6c5
commit
6bbca7cf4a
@ -34,25 +34,25 @@ zmq::socket_poller_t::socket_poller_t () :
|
||||
tag (0xCAFEBABE),
|
||||
need_rebuild (true),
|
||||
use_signaler (false)
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
,
|
||||
pollfds (NULL)
|
||||
#endif
|
||||
{
|
||||
{
|
||||
}
|
||||
|
||||
zmq::socket_poller_t::~socket_poller_t ()
|
||||
{
|
||||
// Mark the socket_poller as dead
|
||||
tag = 0xdeadbeef;
|
||||
tag = 0xdeadbeef;
|
||||
|
||||
for (items_t::iterator it = items.begin(); it != items.end(); ++it) {
|
||||
if (it->socket) {
|
||||
if (it->socket && it->socket->check_tag()) {
|
||||
int thread_safe;
|
||||
size_t thread_safe_size = sizeof(int);
|
||||
|
||||
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
|
||||
it->socket->remove_signaler (&signaler);
|
||||
it->socket->remove_signaler (&signaler);
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,21 +61,21 @@ zmq::socket_poller_t::~socket_poller_t ()
|
||||
free (pollfds);
|
||||
pollfds = NULL;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
bool zmq::socket_poller_t::check_tag ()
|
||||
bool zmq::socket_poller_t::check_tag ()
|
||||
{
|
||||
return tag == 0xCAFEBABE;
|
||||
}
|
||||
|
||||
int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_)
|
||||
int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_)
|
||||
{
|
||||
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
||||
if (it->socket == socket_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int thread_safe;
|
||||
@ -88,7 +88,7 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e
|
||||
if (socket_->add_signaler (&signaler) == -1)
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
item_t item = {socket_, 0, user_data_, events_};
|
||||
items.push_back (item);
|
||||
need_rebuild = true;
|
||||
@ -102,7 +102,7 @@ int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
|
||||
if (!it->socket && it->fd == fd_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
item_t item = {NULL, fd_, user_data_, events_};
|
||||
@ -146,12 +146,12 @@ int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
it->events = events_;
|
||||
need_rebuild = true;
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
||||
@ -167,7 +167,7 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
int thread_safe;
|
||||
size_t thread_safe_size = sizeof(int);
|
||||
|
||||
@ -178,10 +178,10 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
||||
if (socket_->remove_signaler (&signaler) == -1)
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
items.erase (it);
|
||||
need_rebuild = true;
|
||||
|
||||
need_rebuild = true;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -198,14 +198,14 @@ int zmq::socket_poller_t::remove_fd (fd_t fd_)
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
items.erase (it);
|
||||
need_rebuild = true;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::socket_poller_t::rebuild ()
|
||||
int zmq::socket_poller_t::rebuild ()
|
||||
{
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
|
||||
@ -231,16 +231,16 @@ int zmq::socket_poller_t::rebuild ()
|
||||
if (!use_signaler) {
|
||||
use_signaler = true;
|
||||
poll_size++;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
poll_size++;
|
||||
}
|
||||
else
|
||||
poll_size++;
|
||||
else
|
||||
poll_size++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (poll_size == 0)
|
||||
return 0;
|
||||
|
||||
@ -255,7 +255,7 @@ int zmq::socket_poller_t::rebuild ()
|
||||
pollfds[0].events = POLLIN;
|
||||
}
|
||||
|
||||
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
||||
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
||||
if (it->events) {
|
||||
if (it->socket) {
|
||||
int thread_safe;
|
||||
@ -264,12 +264,12 @@ int zmq::socket_poller_t::rebuild ()
|
||||
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
|
||||
return -1;
|
||||
|
||||
if (!thread_safe) {
|
||||
if (!thread_safe) {
|
||||
size_t fd_size = sizeof (zmq::fd_t);
|
||||
if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
pollfds [item_nbr].events = POLLIN;
|
||||
item_nbr++;
|
||||
}
|
||||
@ -281,7 +281,7 @@ int zmq::socket_poller_t::rebuild ()
|
||||
(it->events & ZMQ_POLLOUT ? POLLOUT : 0) |
|
||||
(it->events & ZMQ_POLLPRI ? POLLPRI : 0);
|
||||
it->pollfd_index = item_nbr;
|
||||
item_nbr++;
|
||||
item_nbr++;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -339,9 +339,9 @@ int zmq::socket_poller_t::rebuild ()
|
||||
|
||||
FD_SET (notify_fd, &pollset_in);
|
||||
if (maxfd < notify_fd)
|
||||
maxfd = notify_fd;
|
||||
maxfd = notify_fd;
|
||||
|
||||
poll_size++;
|
||||
poll_size++;
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor. Convert the poll item
|
||||
@ -391,7 +391,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
||||
zmq::clock_t clock;
|
||||
uint64_t now = 0;
|
||||
uint64_t end = 0;
|
||||
|
||||
|
||||
bool first_pass = true;
|
||||
|
||||
while (true) {
|
||||
@ -438,14 +438,14 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
||||
|
||||
// If there is event to return, we can exit immediately.
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor, simply convert
|
||||
// the events to zmq_pollitem_t-style format.
|
||||
else {
|
||||
short revents = pollfds [it->pollfd_index].revents;
|
||||
short events = 0;
|
||||
|
||||
short events = 0;
|
||||
|
||||
if (revents & POLLIN)
|
||||
events |= ZMQ_POLLIN;
|
||||
if (revents & POLLOUT)
|
||||
@ -517,7 +517,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
||||
uint64_t now = 0;
|
||||
uint64_t end = 0;
|
||||
|
||||
bool first_pass = true;
|
||||
bool first_pass = true;
|
||||
fd_set inset, outset, errset;
|
||||
|
||||
while (true) {
|
||||
@ -582,7 +582,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
||||
|
||||
// If there is event to return, we can exit immediately.
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor, simply convert
|
||||
// the events to zmq_pollitem_t-style format.
|
||||
@ -595,7 +595,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
||||
events |= ZMQ_POLLOUT;
|
||||
if (FD_ISSET (it->fd, &errset))
|
||||
events |= ZMQ_POLLERR;
|
||||
|
||||
|
||||
if (events) {
|
||||
event_->socket = NULL;
|
||||
event_->user_data = it->user_data;
|
||||
@ -648,5 +648,3 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
||||
return -1;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user