diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index 48ee06af..ad0df66b 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -34,25 +34,25 @@ zmq::socket_poller_t::socket_poller_t () : tag (0xCAFEBABE), need_rebuild (true), use_signaler (false) -#if defined ZMQ_POLL_BASED_ON_POLL +#if defined ZMQ_POLL_BASED_ON_POLL , pollfds (NULL) #endif -{ +{ } zmq::socket_poller_t::~socket_poller_t () { // Mark the socket_poller as dead - tag = 0xdeadbeef; + tag = 0xdeadbeef; for (items_t::iterator it = items.begin(); it != items.end(); ++it) { - if (it->socket) { + if (it->socket && it->socket->check_tag()) { int thread_safe; size_t thread_safe_size = sizeof(int); if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe) - it->socket->remove_signaler (&signaler); + it->socket->remove_signaler (&signaler); } } @@ -61,21 +61,21 @@ zmq::socket_poller_t::~socket_poller_t () free (pollfds); pollfds = NULL; } -#endif +#endif } -bool zmq::socket_poller_t::check_tag () +bool zmq::socket_poller_t::check_tag () { return tag == 0xCAFEBABE; } -int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_) +int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_) { for (items_t::iterator it = items.begin (); it != items.end (); ++it) { if (it->socket == socket_) { errno = EINVAL; return -1; - } + } } int thread_safe; @@ -88,7 +88,7 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e if (socket_->add_signaler (&signaler) == -1) return -1; } - + item_t item = {socket_, 0, user_data_, events_}; items.push_back (item); need_rebuild = true; @@ -102,7 +102,7 @@ int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_) if (!it->socket && it->fd == fd_) { errno = EINVAL; return -1; - } + } } item_t item = {NULL, fd_, user_data_, events_}; @@ -146,12 +146,12 @@ int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_) errno = EINVAL; return -1; } - + it->events = events_; need_rebuild = true; return 0; -} +} int zmq::socket_poller_t::remove (socket_base_t *socket_) @@ -167,7 +167,7 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_) errno = EINVAL; return -1; } - + int thread_safe; size_t thread_safe_size = sizeof(int); @@ -178,10 +178,10 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_) if (socket_->remove_signaler (&signaler) == -1) return -1; } - + items.erase (it); - need_rebuild = true; - + need_rebuild = true; + return 0; } @@ -198,14 +198,14 @@ int zmq::socket_poller_t::remove_fd (fd_t fd_) errno = EINVAL; return -1; } - + items.erase (it); need_rebuild = true; return 0; } -int zmq::socket_poller_t::rebuild () +int zmq::socket_poller_t::rebuild () { #if defined ZMQ_POLL_BASED_ON_POLL @@ -231,16 +231,16 @@ int zmq::socket_poller_t::rebuild () if (!use_signaler) { use_signaler = true; poll_size++; - } + } } else poll_size++; } - else - poll_size++; + else + poll_size++; } } - + if (poll_size == 0) return 0; @@ -255,7 +255,7 @@ int zmq::socket_poller_t::rebuild () pollfds[0].events = POLLIN; } - for (items_t::iterator it = items.begin (); it != items.end (); ++it) { + for (items_t::iterator it = items.begin (); it != items.end (); ++it) { if (it->events) { if (it->socket) { int thread_safe; @@ -264,12 +264,12 @@ int zmq::socket_poller_t::rebuild () if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) return -1; - if (!thread_safe) { + if (!thread_safe) { size_t fd_size = sizeof (zmq::fd_t); if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) { return -1; } - + pollfds [item_nbr].events = POLLIN; item_nbr++; } @@ -281,7 +281,7 @@ int zmq::socket_poller_t::rebuild () (it->events & ZMQ_POLLOUT ? POLLOUT : 0) | (it->events & ZMQ_POLLPRI ? POLLPRI : 0); it->pollfd_index = item_nbr; - item_nbr++; + item_nbr++; } } } @@ -339,9 +339,9 @@ int zmq::socket_poller_t::rebuild () FD_SET (notify_fd, &pollset_in); if (maxfd < notify_fd) - maxfd = notify_fd; + maxfd = notify_fd; - poll_size++; + poll_size++; } } // Else, the poll item is a raw file descriptor. Convert the poll item @@ -391,7 +391,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time zmq::clock_t clock; uint64_t now = 0; uint64_t end = 0; - + bool first_pass = true; while (true) { @@ -438,14 +438,14 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time // If there is event to return, we can exit immediately. return 0; - } + } } // Else, the poll item is a raw file descriptor, simply convert // the events to zmq_pollitem_t-style format. else { short revents = pollfds [it->pollfd_index].revents; - short events = 0; - + short events = 0; + if (revents & POLLIN) events |= ZMQ_POLLIN; if (revents & POLLOUT) @@ -517,7 +517,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time uint64_t now = 0; uint64_t end = 0; - bool first_pass = true; + bool first_pass = true; fd_set inset, outset, errset; while (true) { @@ -582,7 +582,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time // If there is event to return, we can exit immediately. return 0; - } + } } // Else, the poll item is a raw file descriptor, simply convert // the events to zmq_pollitem_t-style format. @@ -595,7 +595,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time events |= ZMQ_POLLOUT; if (FD_ISSET (it->fd, &errset)) events |= ZMQ_POLLERR; - + if (events) { event_->socket = NULL; event_->user_data = it->user_data; @@ -648,5 +648,3 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time return -1; #endif } - -