2015-10-18 21:07:23 +03:00
|
|
|
/*
|
2016-01-28 15:07:31 +01:00
|
|
|
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
2015-10-18 21:07:23 +03:00
|
|
|
|
|
|
|
This file is part of libzmq, the ZeroMQ core engine in C++.
|
|
|
|
|
|
|
|
libzmq is free software; you can redistribute it and/or modify it under
|
|
|
|
the terms of the GNU Lesser General Public License (LGPL) as published
|
|
|
|
by the Free Software Foundation; either version 3 of the License, or
|
|
|
|
(at your option) any later version.
|
|
|
|
|
|
|
|
As a special exception, the Contributors give you permission to link
|
|
|
|
this library with independent modules to produce an executable,
|
|
|
|
regardless of the license terms of these independent modules, and to
|
|
|
|
copy and distribute the resulting executable under terms of your choice,
|
|
|
|
provided that you also meet, for each linked independent module, the
|
|
|
|
terms and conditions of the license of that module. An independent
|
|
|
|
module is a module which is not derived from or based on this library.
|
|
|
|
If you modify this library, you must extend this exception to your
|
|
|
|
version of the library.
|
|
|
|
|
|
|
|
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
|
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
|
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
|
|
|
License for more details.
|
|
|
|
|
|
|
|
You should have received a copy of the GNU Lesser General Public License
|
|
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
2016-02-18 10:56:52 -06:00
|
|
|
#include "precompiled.hpp"
|
2015-10-18 21:07:23 +03:00
|
|
|
#include "socket_poller.hpp"
|
|
|
|
#include "err.hpp"
|
|
|
|
|
2018-05-14 22:07:10 +02:00
|
|
|
#include <limits.h>
|
|
|
|
|
2018-02-02 16:40:02 +01:00
|
|
|
static bool is_thread_safe (zmq::socket_base_t &socket)
|
|
|
|
{
|
2018-03-28 10:46:19 +02:00
|
|
|
// do not use getsockopt here, since that would fail during context termination
|
|
|
|
return socket.is_thread_safe ();
|
2018-02-02 16:40:02 +01:00
|
|
|
}
|
|
|
|
|
2015-10-18 21:07:23 +03:00
|
|
|
zmq::socket_poller_t::socket_poller_t () :
|
|
|
|
tag (0xCAFEBABE),
|
2017-03-11 10:57:29 +02:00
|
|
|
signaler (NULL),
|
2015-10-22 11:12:04 +03:00
|
|
|
need_rebuild (true),
|
2016-02-21 23:50:34 +00:00
|
|
|
use_signaler (false),
|
2018-02-01 11:46:09 +01:00
|
|
|
poll_size (0)
|
2016-01-13 15:12:47 +02:00
|
|
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
2015-10-22 11:12:04 +03:00
|
|
|
,
|
|
|
|
pollfds (NULL)
|
2016-02-21 23:50:34 +00:00
|
|
|
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
|
|
|
,
|
2018-02-01 11:46:09 +01:00
|
|
|
maxfd (0)
|
2015-10-22 11:12:04 +03:00
|
|
|
#endif
|
2016-01-13 15:12:47 +02:00
|
|
|
{
|
2016-02-21 15:49:47 -06:00
|
|
|
#if defined ZMQ_POLL_BASED_ON_SELECT
|
2017-04-10 11:35:08 +02:00
|
|
|
#if defined ZMQ_HAVE_WINDOWS
|
|
|
|
// On Windows fd_set contains array of SOCKETs, each 4 bytes.
|
|
|
|
// For large fd_sets memset() could be expensive and it is unnecessary.
|
|
|
|
// It is enough to set fd_count to 0, exactly what FD_ZERO() macro does.
|
|
|
|
FD_ZERO (&pollset_in);
|
|
|
|
FD_ZERO (&pollset_out);
|
|
|
|
FD_ZERO (&pollset_err);
|
|
|
|
#else
|
2018-02-01 11:46:09 +01:00
|
|
|
memset (&pollset_in, 0, sizeof (pollset_in));
|
|
|
|
memset (&pollset_out, 0, sizeof (pollset_out));
|
|
|
|
memset (&pollset_err, 0, sizeof (pollset_err));
|
2017-04-10 11:35:08 +02:00
|
|
|
#endif
|
2016-02-21 15:49:47 -06:00
|
|
|
#endif
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
zmq::socket_poller_t::~socket_poller_t ()
|
|
|
|
{
|
|
|
|
// Mark the socket_poller as dead
|
2016-01-13 15:12:47 +02:00
|
|
|
tag = 0xdeadbeef;
|
2015-10-18 21:07:23 +03:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
2018-02-02 16:40:02 +01:00
|
|
|
// TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead?
|
|
|
|
if (it->socket && it->socket->check_tag ()
|
|
|
|
&& is_thread_safe (*it->socket)) {
|
|
|
|
it->socket->remove_signaler (signaler);
|
2015-10-18 21:50:24 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-03-11 10:57:29 +02:00
|
|
|
if (signaler != NULL) {
|
|
|
|
delete signaler;
|
|
|
|
signaler = NULL;
|
|
|
|
}
|
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
|
|
|
if (pollfds) {
|
|
|
|
free (pollfds);
|
|
|
|
pollfds = NULL;
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
2016-01-13 15:12:47 +02:00
|
|
|
#endif
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
|
|
|
|
2016-01-13 15:12:47 +02:00
|
|
|
bool zmq::socket_poller_t::check_tag ()
|
2015-10-18 21:07:23 +03:00
|
|
|
{
|
|
|
|
return tag == 0xCAFEBABE;
|
|
|
|
}
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
int zmq::socket_poller_t::add (socket_base_t *socket_,
|
|
|
|
void *user_data_,
|
|
|
|
short events_)
|
2015-10-18 21:07:23 +03:00
|
|
|
{
|
2015-10-22 11:12:04 +03:00
|
|
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
2015-10-18 21:07:23 +03:00
|
|
|
if (it->socket == socket_) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
2016-01-13 15:12:47 +02:00
|
|
|
}
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
|
|
|
|
2018-02-02 16:40:02 +01:00
|
|
|
if (is_thread_safe (*socket_)) {
|
2018-01-31 17:03:29 +01:00
|
|
|
if (signaler == NULL) {
|
|
|
|
signaler = new (std::nothrow) signaler_t ();
|
|
|
|
if (!signaler) {
|
|
|
|
errno = ENOMEM;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
if (!signaler->valid ()) {
|
|
|
|
delete signaler;
|
|
|
|
signaler = NULL;
|
|
|
|
errno = EMFILE;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
}
|
2017-03-11 10:57:29 +02:00
|
|
|
|
2018-02-02 16:29:10 +01:00
|
|
|
socket_->add_signaler (signaler);
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
2016-01-13 15:12:47 +02:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
item_t item = {
|
|
|
|
socket_,
|
|
|
|
0,
|
|
|
|
user_data_,
|
|
|
|
events_
|
Style: Fix -Wmissing-field-initializers in socket_poller.cpp
This commit addresses the following warnings reported on gcc 5.2.1. In
the future, this will help reduce the "noise" and help catch warnings
revealing a serious problem.
It was originally introduce in the refactoring associated with
zeromq/libzmq@da2bc60 (Removing zmq_pollfd as it is replaced by zmq_poller).
8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---
/path/to/libzmq/src/socket_poller.cpp: In member function ‘int zmq::socket_poller_t::add(zmq::socket_base_t*, void*, short int)’:
/path/to/libzmq/src/socket_poller.cpp:92:51: warning: missing initializer for member ‘zmq::socket_poller_t::item_t::pollfd_index’ [-Wmissing-field-initializers]
item_t item = {socket_, 0, user_data_, events_};
^
/path/to/libzmq/src/socket_poller.cpp: In member function ‘int zmq::socket_poller_t::add_fd(zmq::fd_t, void*, short int)’:
/path/to/libzmq/src/socket_poller.cpp:108:50: warning: missing initializer for member ‘zmq::socket_poller_t::item_t::pollfd_index’ [-Wmissing-field-initializers]
item_t item = {NULL, fd_, user_data_, events_};
^
8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---
2016-01-30 02:54:43 -05:00
|
|
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
2018-02-01 11:46:09 +01:00
|
|
|
,
|
|
|
|
-1
|
Style: Fix -Wmissing-field-initializers in socket_poller.cpp
This commit addresses the following warnings reported on gcc 5.2.1. In
the future, this will help reduce the "noise" and help catch warnings
revealing a serious problem.
It was originally introduce in the refactoring associated with
zeromq/libzmq@da2bc60 (Removing zmq_pollfd as it is replaced by zmq_poller).
8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---
/path/to/libzmq/src/socket_poller.cpp: In member function ‘int zmq::socket_poller_t::add(zmq::socket_base_t*, void*, short int)’:
/path/to/libzmq/src/socket_poller.cpp:92:51: warning: missing initializer for member ‘zmq::socket_poller_t::item_t::pollfd_index’ [-Wmissing-field-initializers]
item_t item = {socket_, 0, user_data_, events_};
^
/path/to/libzmq/src/socket_poller.cpp: In member function ‘int zmq::socket_poller_t::add_fd(zmq::fd_t, void*, short int)’:
/path/to/libzmq/src/socket_poller.cpp:108:50: warning: missing initializer for member ‘zmq::socket_poller_t::item_t::pollfd_index’ [-Wmissing-field-initializers]
item_t item = {NULL, fd_, user_data_, events_};
^
8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---
2016-01-30 02:54:43 -05:00
|
|
|
#endif
|
|
|
|
};
|
2018-05-13 18:06:05 +02:00
|
|
|
try {
|
|
|
|
items.push_back (item);
|
|
|
|
}
|
|
|
|
catch (const std::bad_alloc &) {
|
|
|
|
errno = ENOMEM;
|
|
|
|
return -1;
|
|
|
|
}
|
2015-10-18 21:07:23 +03:00
|
|
|
need_rebuild = true;
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
|
2015-10-18 21:07:23 +03:00
|
|
|
{
|
2018-02-01 11:46:09 +01:00
|
|
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
2015-10-18 21:07:23 +03:00
|
|
|
if (!it->socket && it->fd == fd_) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
2016-01-13 15:12:47 +02:00
|
|
|
}
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
item_t item = {
|
|
|
|
NULL,
|
|
|
|
fd_,
|
|
|
|
user_data_,
|
|
|
|
events_
|
Style: Fix -Wmissing-field-initializers in socket_poller.cpp
This commit addresses the following warnings reported on gcc 5.2.1. In
the future, this will help reduce the "noise" and help catch warnings
revealing a serious problem.
It was originally introduce in the refactoring associated with
zeromq/libzmq@da2bc60 (Removing zmq_pollfd as it is replaced by zmq_poller).
8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---
/path/to/libzmq/src/socket_poller.cpp: In member function ‘int zmq::socket_poller_t::add(zmq::socket_base_t*, void*, short int)’:
/path/to/libzmq/src/socket_poller.cpp:92:51: warning: missing initializer for member ‘zmq::socket_poller_t::item_t::pollfd_index’ [-Wmissing-field-initializers]
item_t item = {socket_, 0, user_data_, events_};
^
/path/to/libzmq/src/socket_poller.cpp: In member function ‘int zmq::socket_poller_t::add_fd(zmq::fd_t, void*, short int)’:
/path/to/libzmq/src/socket_poller.cpp:108:50: warning: missing initializer for member ‘zmq::socket_poller_t::item_t::pollfd_index’ [-Wmissing-field-initializers]
item_t item = {NULL, fd_, user_data_, events_};
^
8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---
2016-01-30 02:54:43 -05:00
|
|
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
2018-02-01 11:46:09 +01:00
|
|
|
,
|
|
|
|
-1
|
Style: Fix -Wmissing-field-initializers in socket_poller.cpp
This commit addresses the following warnings reported on gcc 5.2.1. In
the future, this will help reduce the "noise" and help catch warnings
revealing a serious problem.
It was originally introduce in the refactoring associated with
zeromq/libzmq@da2bc60 (Removing zmq_pollfd as it is replaced by zmq_poller).
8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---
/path/to/libzmq/src/socket_poller.cpp: In member function ‘int zmq::socket_poller_t::add(zmq::socket_base_t*, void*, short int)’:
/path/to/libzmq/src/socket_poller.cpp:92:51: warning: missing initializer for member ‘zmq::socket_poller_t::item_t::pollfd_index’ [-Wmissing-field-initializers]
item_t item = {socket_, 0, user_data_, events_};
^
/path/to/libzmq/src/socket_poller.cpp: In member function ‘int zmq::socket_poller_t::add_fd(zmq::fd_t, void*, short int)’:
/path/to/libzmq/src/socket_poller.cpp:108:50: warning: missing initializer for member ‘zmq::socket_poller_t::item_t::pollfd_index’ [-Wmissing-field-initializers]
item_t item = {NULL, fd_, user_data_, events_};
^
8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---8<---
2016-01-30 02:54:43 -05:00
|
|
|
#endif
|
2018-02-01 11:46:09 +01:00
|
|
|
};
|
2018-05-13 18:06:05 +02:00
|
|
|
try {
|
|
|
|
items.push_back (item);
|
|
|
|
}
|
|
|
|
catch (const std::bad_alloc &) {
|
|
|
|
errno = ENOMEM;
|
|
|
|
return -1;
|
|
|
|
}
|
2015-10-18 21:07:23 +03:00
|
|
|
need_rebuild = true;
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
|
2015-10-21 10:14:36 +03:00
|
|
|
{
|
2015-10-22 11:12:04 +03:00
|
|
|
items_t::iterator it;
|
2015-10-21 10:14:36 +03:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
for (it = items.begin (); it != items.end (); ++it) {
|
2015-10-21 10:14:36 +03:00
|
|
|
if (it->socket == socket_)
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
if (it == items.end ()) {
|
2015-10-21 10:14:36 +03:00
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
it->events = events_;
|
|
|
|
need_rebuild = true;
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
|
2015-10-21 10:14:36 +03:00
|
|
|
{
|
2015-10-22 11:12:04 +03:00
|
|
|
items_t::iterator it;
|
2015-10-21 10:14:36 +03:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
for (it = items.begin (); it != items.end (); ++it) {
|
2015-10-21 10:14:36 +03:00
|
|
|
if (!it->socket && it->fd == fd_)
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
if (it == items.end ()) {
|
2015-10-21 10:14:36 +03:00
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2016-01-13 15:12:47 +02:00
|
|
|
|
2015-10-21 10:14:36 +03:00
|
|
|
it->events = events_;
|
|
|
|
need_rebuild = true;
|
|
|
|
|
|
|
|
return 0;
|
2016-01-13 15:12:47 +02:00
|
|
|
}
|
2015-10-21 10:14:36 +03:00
|
|
|
|
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
2015-10-18 21:07:23 +03:00
|
|
|
{
|
2015-10-22 11:12:04 +03:00
|
|
|
items_t::iterator it;
|
2015-10-18 21:07:23 +03:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
for (it = items.begin (); it != items.end (); ++it) {
|
2015-10-18 21:07:23 +03:00
|
|
|
if (it->socket == socket_)
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
if (it == items.end ()) {
|
2015-10-18 21:07:23 +03:00
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2016-01-13 15:12:47 +02:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
items.erase (it);
|
2016-05-05 12:53:55 +03:00
|
|
|
need_rebuild = true;
|
|
|
|
|
2018-02-02 16:40:02 +01:00
|
|
|
if (is_thread_safe (*socket_)) {
|
2017-03-11 10:57:29 +02:00
|
|
|
socket_->remove_signaler (signaler);
|
2018-02-02 16:40:02 +01:00
|
|
|
}
|
2016-01-13 15:12:47 +02:00
|
|
|
|
2015-10-18 21:07:23 +03:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
int zmq::socket_poller_t::remove_fd (fd_t fd_)
|
2015-10-18 21:07:23 +03:00
|
|
|
{
|
2015-10-22 11:12:04 +03:00
|
|
|
items_t::iterator it;
|
2015-10-18 21:07:23 +03:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
for (it = items.begin (); it != items.end (); ++it) {
|
2015-10-18 21:07:23 +03:00
|
|
|
if (!it->socket && it->fd == fd_)
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
if (it == items.end ()) {
|
2015-10-18 21:07:23 +03:00
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2016-01-13 15:12:47 +02:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
items.erase (it);
|
2015-10-18 21:07:23 +03:00
|
|
|
need_rebuild = true;
|
|
|
|
|
|
|
|
return 0;
|
2015-10-22 11:12:04 +03:00
|
|
|
}
|
2015-10-18 21:07:23 +03:00
|
|
|
|
2017-08-22 20:44:18 +02:00
|
|
|
void zmq::socket_poller_t::rebuild ()
|
2015-10-18 21:07:23 +03:00
|
|
|
{
|
2015-10-22 11:12:04 +03:00
|
|
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
2015-10-18 21:07:23 +03:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
if (pollfds) {
|
|
|
|
free (pollfds);
|
|
|
|
pollfds = NULL;
|
|
|
|
}
|
2015-10-18 21:07:23 +03:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
use_signaler = false;
|
|
|
|
|
|
|
|
poll_size = 0;
|
|
|
|
|
|
|
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
|
|
|
if (it->events) {
|
2018-02-02 16:40:02 +01:00
|
|
|
if (it->socket && is_thread_safe (*it->socket)) {
|
|
|
|
if (!use_signaler) {
|
|
|
|
use_signaler = true;
|
2015-10-22 11:12:04 +03:00
|
|
|
poll_size++;
|
2018-02-02 16:40:02 +01:00
|
|
|
}
|
2018-02-01 11:46:09 +01:00
|
|
|
} else
|
2016-01-13 15:12:47 +02:00
|
|
|
poll_size++;
|
2015-10-22 11:12:04 +03:00
|
|
|
}
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
2016-01-13 15:12:47 +02:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
if (poll_size == 0)
|
2017-08-22 20:44:18 +02:00
|
|
|
return;
|
2015-10-18 21:07:23 +03:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
pollfds = (pollfd *) malloc (poll_size * sizeof (pollfd));
|
2015-10-22 11:12:04 +03:00
|
|
|
alloc_assert (pollfds);
|
|
|
|
|
|
|
|
int item_nbr = 0;
|
|
|
|
|
|
|
|
if (use_signaler) {
|
|
|
|
item_nbr = 1;
|
2018-02-01 11:46:09 +01:00
|
|
|
pollfds[0].fd = signaler->get_fd ();
|
2015-10-22 11:12:04 +03:00
|
|
|
pollfds[0].events = POLLIN;
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
|
|
|
|
2016-01-13 15:12:47 +02:00
|
|
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
2015-10-22 11:12:04 +03:00
|
|
|
if (it->events) {
|
|
|
|
if (it->socket) {
|
2018-02-02 16:40:02 +01:00
|
|
|
if (!is_thread_safe (*it->socket)) {
|
2015-10-22 11:12:04 +03:00
|
|
|
size_t fd_size = sizeof (zmq::fd_t);
|
2018-02-02 16:40:02 +01:00
|
|
|
int rc = it->socket->getsockopt (
|
|
|
|
ZMQ_FD, &pollfds[item_nbr].fd, &fd_size);
|
2017-08-22 20:44:18 +02:00
|
|
|
zmq_assert (rc == 0);
|
2016-01-13 15:12:47 +02:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
pollfds[item_nbr].events = POLLIN;
|
2015-10-22 11:12:04 +03:00
|
|
|
item_nbr++;
|
|
|
|
}
|
2018-02-01 11:46:09 +01:00
|
|
|
} else {
|
|
|
|
pollfds[item_nbr].fd = it->fd;
|
|
|
|
pollfds[item_nbr].events =
|
|
|
|
(it->events & ZMQ_POLLIN ? POLLIN : 0)
|
|
|
|
| (it->events & ZMQ_POLLOUT ? POLLOUT : 0)
|
|
|
|
| (it->events & ZMQ_POLLPRI ? POLLPRI : 0);
|
2015-10-22 11:12:04 +03:00
|
|
|
it->pollfd_index = item_nbr;
|
2016-01-13 15:12:47 +02:00
|
|
|
item_nbr++;
|
2015-10-22 11:12:04 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-10-18 21:07:23 +03:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
2015-10-22 11:12:04 +03:00
|
|
|
|
|
|
|
FD_ZERO (&pollset_in);
|
|
|
|
FD_ZERO (&pollset_out);
|
|
|
|
FD_ZERO (&pollset_err);
|
|
|
|
|
|
|
|
// Ensure we do not attempt to select () on more than FD_SETSIZE
|
|
|
|
// file descriptors.
|
|
|
|
zmq_assert (items.size () <= FD_SETSIZE);
|
|
|
|
|
|
|
|
poll_size = 0;
|
|
|
|
|
|
|
|
use_signaler = false;
|
|
|
|
|
|
|
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
2018-02-02 16:40:02 +01:00
|
|
|
if (it->socket && is_thread_safe (*it->socket) && it->events) {
|
|
|
|
use_signaler = true;
|
|
|
|
FD_SET (signaler->get_fd (), &pollset_in);
|
|
|
|
poll_size = 1;
|
|
|
|
break;
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
|
|
|
}
|
2015-10-22 11:12:04 +03:00
|
|
|
|
|
|
|
maxfd = 0;
|
|
|
|
|
|
|
|
// Build the fd_sets for passing to select ().
|
|
|
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
|
|
|
if (it->events) {
|
|
|
|
// If the poll item is a 0MQ socket we are interested in input on the
|
|
|
|
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
|
|
|
if (it->socket) {
|
2018-02-02 16:40:02 +01:00
|
|
|
if (!is_thread_safe (*it->socket)) {
|
2015-10-22 11:12:04 +03:00
|
|
|
zmq::fd_t notify_fd;
|
|
|
|
size_t fd_size = sizeof (zmq::fd_t);
|
2018-02-02 16:40:02 +01:00
|
|
|
int rc =
|
|
|
|
it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size);
|
2017-08-22 20:44:18 +02:00
|
|
|
zmq_assert (rc == 0);
|
2015-10-22 11:12:04 +03:00
|
|
|
|
|
|
|
FD_SET (notify_fd, &pollset_in);
|
|
|
|
if (maxfd < notify_fd)
|
2016-01-13 15:12:47 +02:00
|
|
|
maxfd = notify_fd;
|
2015-10-22 11:12:04 +03:00
|
|
|
|
2016-01-13 15:12:47 +02:00
|
|
|
poll_size++;
|
2015-10-22 11:12:04 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
// Else, the poll item is a raw file descriptor. Convert the poll item
|
|
|
|
// events to the appropriate fd_sets.
|
|
|
|
else {
|
|
|
|
if (it->events & ZMQ_POLLIN)
|
|
|
|
FD_SET (it->fd, &pollset_in);
|
|
|
|
if (it->events & ZMQ_POLLOUT)
|
|
|
|
FD_SET (it->fd, &pollset_out);
|
|
|
|
if (it->events & ZMQ_POLLERR)
|
|
|
|
FD_SET (it->fd, &pollset_err);
|
|
|
|
if (maxfd < it->fd)
|
|
|
|
maxfd = it->fd;
|
|
|
|
|
|
|
|
poll_size++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
need_rebuild = false;
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
|
|
|
|
2017-09-07 00:11:22 +02:00
|
|
|
void zmq::socket_poller_t::zero_trail_events (
|
2018-02-01 11:46:09 +01:00
|
|
|
zmq::socket_poller_t::event_t *events_, int n_events_, int found)
|
2017-09-07 00:11:22 +02:00
|
|
|
{
|
|
|
|
for (int i = found; i < n_events_; ++i) {
|
|
|
|
events_[i].socket = NULL;
|
|
|
|
events_[i].fd = 0;
|
|
|
|
events_[i].user_data = NULL;
|
|
|
|
events_[i].events = 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
|
|
|
int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
|
2018-02-01 11:46:09 +01:00
|
|
|
int n_events_)
|
2017-09-07 00:11:22 +02:00
|
|
|
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
|
|
|
int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
|
2018-02-01 11:46:09 +01:00
|
|
|
int n_events_,
|
|
|
|
fd_set &inset,
|
|
|
|
fd_set &outset,
|
|
|
|
fd_set &errset)
|
2017-09-07 00:11:22 +02:00
|
|
|
#endif
|
|
|
|
{
|
|
|
|
int found = 0;
|
2018-02-01 11:46:09 +01:00
|
|
|
for (items_t::iterator it = items.begin ();
|
|
|
|
it != items.end () && found < n_events_; ++it) {
|
2017-09-07 00:11:22 +02:00
|
|
|
// The poll item is a 0MQ socket. Retrieve pending events
|
|
|
|
// using the ZMQ_EVENTS socket option.
|
|
|
|
if (it->socket) {
|
|
|
|
size_t events_size = sizeof (uint32_t);
|
|
|
|
uint32_t events;
|
|
|
|
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size)
|
2018-02-01 11:46:09 +01:00
|
|
|
== -1) {
|
2017-09-07 00:11:22 +02:00
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (it->events & events) {
|
|
|
|
events_[found].socket = it->socket;
|
|
|
|
events_[found].user_data = it->user_data;
|
|
|
|
events_[found].events = it->events & events;
|
|
|
|
++found;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Else, the poll item is a raw file descriptor, simply convert
|
|
|
|
// the events to zmq_pollitem_t-style format.
|
|
|
|
else {
|
|
|
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
short revents = pollfds[it->pollfd_index].revents;
|
2017-09-07 00:11:22 +02:00
|
|
|
short events = 0;
|
|
|
|
|
|
|
|
if (revents & POLLIN)
|
|
|
|
events |= ZMQ_POLLIN;
|
|
|
|
if (revents & POLLOUT)
|
|
|
|
events |= ZMQ_POLLOUT;
|
|
|
|
if (revents & POLLPRI)
|
|
|
|
events |= ZMQ_POLLPRI;
|
|
|
|
if (revents & ~(POLLIN | POLLOUT | POLLPRI))
|
|
|
|
events |= ZMQ_POLLERR;
|
|
|
|
|
|
|
|
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
|
|
|
|
|
|
|
short events = 0;
|
|
|
|
|
|
|
|
if (FD_ISSET (it->fd, &inset))
|
|
|
|
events |= ZMQ_POLLIN;
|
|
|
|
if (FD_ISSET (it->fd, &outset))
|
|
|
|
events |= ZMQ_POLLOUT;
|
|
|
|
if (FD_ISSET (it->fd, &errset))
|
|
|
|
events |= ZMQ_POLLERR;
|
|
|
|
#endif //POLL_SELECT
|
|
|
|
|
|
|
|
if (events) {
|
|
|
|
events_[found].socket = NULL;
|
|
|
|
events_[found].user_data = it->user_data;
|
|
|
|
events_[found].fd = it->fd;
|
|
|
|
events_[found].events = events;
|
|
|
|
++found;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return found;
|
|
|
|
}
|
|
|
|
|
|
|
|
//Return 0 if timeout is expired otherwise 1
|
2018-02-01 11:46:09 +01:00
|
|
|
int zmq::socket_poller_t::adjust_timeout (zmq::clock_t &clock,
|
|
|
|
long timeout_,
|
|
|
|
uint64_t &now,
|
|
|
|
uint64_t &end,
|
|
|
|
bool &first_pass)
|
2017-09-07 00:11:22 +02:00
|
|
|
{
|
|
|
|
// If socket_poller_t::timeout is zero, exit immediately whether there
|
|
|
|
// are events or not.
|
|
|
|
if (timeout_ == 0)
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
// At this point we are meant to wait for events but there are none.
|
|
|
|
// If timeout is infinite we can just loop until we get some events.
|
|
|
|
if (timeout_ < 0) {
|
|
|
|
if (first_pass)
|
|
|
|
first_pass = false;
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
// The timeout is finite and there are no events. In the first pass
|
|
|
|
// we get a timestamp of when the polling have begun. (We assume that
|
|
|
|
// first pass have taken negligible time). We also compute the time
|
|
|
|
// when the polling should time out.
|
|
|
|
now = clock.now_ms ();
|
|
|
|
if (first_pass) {
|
|
|
|
end = now + timeout_;
|
|
|
|
first_pass = false;
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Find out whether timeout have expired.
|
|
|
|
if (now >= end)
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
|
2018-02-01 11:46:09 +01:00
|
|
|
int n_events_,
|
|
|
|
long timeout_)
|
2015-10-18 21:07:23 +03:00
|
|
|
{
|
2017-08-22 18:36:11 +02:00
|
|
|
if (items.empty () && timeout_ < 0) {
|
|
|
|
errno = EFAULT;
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
if (need_rebuild)
|
2017-08-22 20:44:18 +02:00
|
|
|
rebuild ();
|
2015-10-22 11:12:04 +03:00
|
|
|
|
|
|
|
if (unlikely (poll_size == 0)) {
|
2016-04-13 02:59:15 +02:00
|
|
|
// We'll report an error (timed out) as if the list was non-empty and
|
2017-08-22 20:44:18 +02:00
|
|
|
// no event occurred within the specified timeout. Otherwise the caller
|
2016-04-13 02:59:15 +02:00
|
|
|
// needs to check the return value AND the event to avoid using the
|
|
|
|
// nullified event data.
|
2017-08-22 20:00:29 +02:00
|
|
|
errno = EAGAIN;
|
2015-10-22 11:12:04 +03:00
|
|
|
if (timeout_ == 0)
|
2016-04-12 20:11:50 +02:00
|
|
|
return -1;
|
2015-10-22 11:12:04 +03:00
|
|
|
#if defined ZMQ_HAVE_WINDOWS
|
|
|
|
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
|
2016-04-12 20:11:50 +02:00
|
|
|
return -1;
|
2015-10-22 11:12:04 +03:00
|
|
|
#elif defined ZMQ_HAVE_ANDROID
|
|
|
|
usleep (timeout_ * 1000);
|
2016-04-12 20:11:50 +02:00
|
|
|
return -1;
|
2017-11-02 09:59:47 +01:00
|
|
|
#elif defined ZMQ_HAVE_OSX
|
|
|
|
usleep (timeout_ * 1000);
|
|
|
|
errno = EAGAIN;
|
|
|
|
return -1;
|
2018-03-10 03:03:02 -08:00
|
|
|
#elif defined ZMQ_HAVE_VXWORKS
|
|
|
|
struct timespec ns_;
|
|
|
|
ns_.tv_sec = timeout_ / 1000;
|
|
|
|
ns_.tv_nsec = timeout_ % 1000 * 1000000;
|
|
|
|
nanosleep (&ns_, 0);
|
|
|
|
return -1;
|
2015-10-22 11:12:04 +03:00
|
|
|
#else
|
2016-04-12 20:11:50 +02:00
|
|
|
usleep (timeout_ * 1000);
|
|
|
|
return -1;
|
2015-10-22 11:12:04 +03:00
|
|
|
#endif
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
|
|
|
|
2017-09-07 00:11:22 +02:00
|
|
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
2015-10-22 11:12:04 +03:00
|
|
|
zmq::clock_t clock;
|
|
|
|
uint64_t now = 0;
|
|
|
|
uint64_t end = 0;
|
2016-01-13 15:12:47 +02:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
bool first_pass = true;
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
// Compute the timeout for the subsequent poll.
|
|
|
|
int timeout;
|
|
|
|
if (first_pass)
|
|
|
|
timeout = 0;
|
2018-02-01 11:46:09 +01:00
|
|
|
else if (timeout_ < 0)
|
2015-10-22 11:12:04 +03:00
|
|
|
timeout = -1;
|
|
|
|
else
|
2018-05-14 22:07:10 +02:00
|
|
|
timeout =
|
|
|
|
static_cast<int> (std::min<uint64_t> (end - now, INT_MAX));
|
2015-10-22 11:12:04 +03:00
|
|
|
|
|
|
|
// Wait for events.
|
|
|
|
while (true) {
|
|
|
|
int rc = poll (pollfds, poll_size, timeout);
|
|
|
|
if (rc == -1 && errno == EINTR) {
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
errno_assert (rc >= 0);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Receive the signal from pollfd
|
|
|
|
if (use_signaler && pollfds[0].revents & POLLIN)
|
2017-03-11 10:57:29 +02:00
|
|
|
signaler->recv ();
|
2015-10-22 11:12:04 +03:00
|
|
|
|
|
|
|
// Check for the events.
|
2017-09-07 00:11:22 +02:00
|
|
|
int found = check_events (events_, n_events_);
|
2016-09-16 16:58:03 +02:00
|
|
|
if (found) {
|
2017-09-07 00:11:22 +02:00
|
|
|
if (found > 0)
|
|
|
|
zero_trail_events (events_, n_events_, found);
|
2016-09-29 15:08:10 +02:00
|
|
|
return found;
|
2016-09-16 16:58:03 +02:00
|
|
|
}
|
2015-10-22 11:12:04 +03:00
|
|
|
|
2017-09-07 00:11:22 +02:00
|
|
|
// Adjust timeout or break
|
|
|
|
if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
|
2015-10-22 11:12:04 +03:00
|
|
|
break;
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
2017-08-22 20:00:29 +02:00
|
|
|
errno = EAGAIN;
|
2015-10-22 11:12:04 +03:00
|
|
|
return -1;
|
|
|
|
|
|
|
|
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
|
|
|
|
|
|
|
zmq::clock_t clock;
|
|
|
|
uint64_t now = 0;
|
|
|
|
uint64_t end = 0;
|
|
|
|
|
2016-01-13 15:12:47 +02:00
|
|
|
bool first_pass = true;
|
2017-09-07 00:11:22 +02:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
fd_set inset, outset, errset;
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
// Compute the timeout for the subsequent poll.
|
|
|
|
timeval timeout;
|
|
|
|
timeval *ptimeout;
|
|
|
|
if (first_pass) {
|
|
|
|
timeout.tv_sec = 0;
|
|
|
|
timeout.tv_usec = 0;
|
|
|
|
ptimeout = &timeout;
|
2018-02-01 11:46:09 +01:00
|
|
|
} else if (timeout_ < 0)
|
2015-10-22 11:12:04 +03:00
|
|
|
ptimeout = NULL;
|
|
|
|
else {
|
2018-05-18 15:54:00 +02:00
|
|
|
timeout.tv_sec = static_cast<long> ((end - now) / 1000);
|
|
|
|
timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
|
2015-10-22 11:12:04 +03:00
|
|
|
ptimeout = &timeout;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for events. Ignore interrupts if there's infinite timeout.
|
|
|
|
while (true) {
|
|
|
|
#if defined ZMQ_HAVE_WINDOWS
|
2017-04-10 11:35:08 +02:00
|
|
|
// On Windows we don't need to copy the whole fd_set.
|
|
|
|
// SOCKETS are continuous from the beginning of fd_array in fd_set.
|
|
|
|
// We just need to copy fd_count elements of fd_array.
|
|
|
|
// We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
|
2018-02-01 11:46:09 +01:00
|
|
|
memcpy (&inset, &pollset_in,
|
2018-05-18 15:54:00 +02:00
|
|
|
reinterpret_cast<char *> (pollset_in.fd_array
|
|
|
|
+ pollset_in.fd_count)
|
|
|
|
- reinterpret_cast<char *> (&pollset_in));
|
2018-02-01 11:46:09 +01:00
|
|
|
memcpy (&outset, &pollset_out,
|
2018-05-18 15:54:00 +02:00
|
|
|
reinterpret_cast<char *> (pollset_out.fd_array
|
|
|
|
+ pollset_out.fd_count)
|
|
|
|
- reinterpret_cast<char *> (&pollset_out));
|
2018-02-01 11:46:09 +01:00
|
|
|
memcpy (&errset, &pollset_err,
|
2018-05-18 15:54:00 +02:00
|
|
|
reinterpret_cast<char *> (pollset_err.fd_array
|
|
|
|
+ pollset_err.fd_count)
|
|
|
|
- reinterpret_cast<char *> (&pollset_err));
|
2015-10-22 11:12:04 +03:00
|
|
|
int rc = select (0, &inset, &outset, &errset, ptimeout);
|
|
|
|
if (unlikely (rc == SOCKET_ERROR)) {
|
|
|
|
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
|
|
|
|
wsa_assert (errno == ENOTSOCK);
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
#else
|
2017-04-10 11:35:08 +02:00
|
|
|
memcpy (&inset, &pollset_in, sizeof (fd_set));
|
|
|
|
memcpy (&outset, &pollset_out, sizeof (fd_set));
|
|
|
|
memcpy (&errset, &pollset_err, sizeof (fd_set));
|
2015-10-22 11:12:04 +03:00
|
|
|
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
|
|
|
|
if (unlikely (rc == -1)) {
|
|
|
|
errno_assert (errno == EINTR || errno == EBADF);
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2017-03-11 10:57:29 +02:00
|
|
|
if (use_signaler && FD_ISSET (signaler->get_fd (), &inset))
|
|
|
|
signaler->recv ();
|
2015-10-22 11:12:04 +03:00
|
|
|
|
|
|
|
// Check for the events.
|
2018-02-01 11:46:09 +01:00
|
|
|
int found = check_events (events_, n_events_, inset, outset, errset);
|
2016-09-28 13:49:49 +02:00
|
|
|
if (found) {
|
2017-09-07 00:11:22 +02:00
|
|
|
if (found > 0)
|
|
|
|
zero_trail_events (events_, n_events_, found);
|
2016-09-29 15:08:10 +02:00
|
|
|
return found;
|
2016-09-28 13:49:49 +02:00
|
|
|
}
|
2015-10-18 21:07:23 +03:00
|
|
|
|
2017-09-07 00:11:22 +02:00
|
|
|
// Adjust timeout or break
|
|
|
|
if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
|
2015-10-22 11:12:04 +03:00
|
|
|
break;
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|
|
|
|
|
2017-08-22 20:00:29 +02:00
|
|
|
errno = EAGAIN;
|
2015-10-22 11:12:04 +03:00
|
|
|
return -1;
|
|
|
|
|
|
|
|
#else
|
2017-09-07 00:11:22 +02:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
// Exotic platforms that support neither poll() nor select().
|
|
|
|
errno = ENOTSUP;
|
|
|
|
return -1;
|
2017-09-07 00:11:22 +02:00
|
|
|
|
2015-10-22 11:12:04 +03:00
|
|
|
#endif
|
2015-10-18 21:07:23 +03:00
|
|
|
}
|