0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-22 07:29:31 +08:00
libzmq/src/zmq.cpp
Pieter Hintjens 8620c3e032 Problem: source file headers are somewhat confusing about LGPLv3
Of course people still "can" distributed the sources under the
LGPLv3. However we provide COPYING.LESSER with additional grants.

Solution: specify these grants in the header of each source file.
2015-06-02 22:33:55 +02:00

1107 lines
30 KiB
C++

/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ZMQ_TYPE_UNSAFE
#include "poller.hpp"
// On AIX platform, poll.h has to be included first to get consistent
// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
// instead of 'events' and 'revents' and defines macros to map from POSIX-y
// names to AIX-specific names).
#if defined ZMQ_POLL_BASED_ON_POLL
#include <poll.h>
#endif
// zmq.h must be included *after* poll.h for AIX to build properly
#include "../include/zmq.h"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#endif
// XSI vector I/O
#if defined ZMQ_HAVE_UIO
#include <sys/uio.h>
#else
struct iovec {
void *iov_base;
size_t iov_len;
};
#endif
#include <string.h>
#include <stdlib.h>
#include <new>
#include "proxy.hpp"
#include "socket_base.hpp"
#include "stdint.hpp"
#include "config.hpp"
#include "likely.hpp"
#include "clock.hpp"
#include "ctx.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "fd.hpp"
#include "metadata.hpp"
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
#if defined ZMQ_HAVE_OPENPGM
#define __PGM_WININT_H__
#include <pgm/pgm.h>
#endif
// Compile time check whether msg_t fits into zmq_msg_t.
typedef char check_msg_t_size
[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
void zmq_version (int *major_, int *minor_, int *patch_)
{
*major_ = ZMQ_VERSION_MAJOR;
*minor_ = ZMQ_VERSION_MINOR;
*patch_ = ZMQ_VERSION_PATCH;
}
const char *zmq_strerror (int errnum_)
{
return zmq::errno_to_string (errnum_);
}
int zmq_errno (void)
{
return errno;
}
// New context API
void *zmq_ctx_new (void)
{
#if defined ZMQ_HAVE_OPENPGM
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
// protocol ID. Note that if you want to use gettimeofday and sleep for
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
// PGM_SLEEP to "USLEEP".
pgm_error_t *pgm_error = NULL;
const bool ok = pgm_init (&pgm_error);
if (ok != TRUE) {
// Invalid parameters don't set pgm_error_t
zmq_assert (pgm_error != NULL);
if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
pgm_error->code == PGM_ERROR_FAILED)) {
// Failed to access RTC or HPET device.
pgm_error_free (pgm_error);
errno = EINVAL;
return NULL;
}
// PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
zmq_assert (false);
}
#endif
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
// times given that WSACleanup will be called for each WSAStartup.
// We do this before the ctx constructor since its embedded mailbox_t
// object needs Winsock to be up and running.
WORD version_requested = MAKEWORD (2, 2);
WSADATA wsa_data;
int rc = WSAStartup (version_requested, &wsa_data);
zmq_assert (rc == 0);
zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
HIBYTE (wsa_data.wVersion) == 2);
#endif
// Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
alloc_assert (ctx);
return ctx;
}
int zmq_ctx_term (void *ctx_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
int rc = ((zmq::ctx_t*) ctx_)->terminate ();
int en = errno;
// Shut down only if termination was not interrupted by a signal.
if (!rc || en != EINTR) {
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
rc = WSACleanup ();
wsa_assert (rc != SOCKET_ERROR);
#endif
#if defined ZMQ_HAVE_OPENPGM
// Shut down the OpenPGM library.
if (pgm_shutdown () != TRUE)
zmq_assert (false);
#endif
}
errno = en;
return rc;
}
int zmq_ctx_shutdown (void *ctx_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->shutdown ();
}
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->set (option_, optval_);
}
int zmq_ctx_get (void *ctx_, int option_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->get (option_);
}
// Stable/legacy context API
void *zmq_init (int io_threads_)
{
if (io_threads_ >= 0) {
void *ctx = zmq_ctx_new ();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
return ctx;
}
errno = EINVAL;
return NULL;
}
int zmq_term (void *ctx_)
{
return zmq_ctx_term (ctx_);
}
int zmq_ctx_destroy (void *ctx_)
{
return zmq_ctx_term (ctx_);
}
// Sockets
void *zmq_socket (void *ctx_, int type_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return NULL;
}
zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
zmq::socket_base_t *s = ctx->create_socket (type_);
return (void *) s;
}
int zmq_close (void *s_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
((zmq::socket_base_t*) s_)->close ();
return 0;
}
int zmq_setsockopt (void *s_, int option_, const void *optval_,
size_t optvallen_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->setsockopt (option_, optval_, optvallen_);
return result;
}
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->getsockopt (option_, optval_, optvallen_);
return result;
}
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->monitor (addr_, events_);
return result;
}
int zmq_bind (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->bind (addr_);
return result;
}
int zmq_connect (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->connect (addr_);
return result;
}
int zmq_unbind (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
return s->term_endpoint (addr_);
}
int zmq_disconnect (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
return s->term_endpoint (addr_);
}
// Sending functions.
static int
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
int sz = (int) zmq_msg_size (msg_);
int rc = s_->send ((zmq::msg_t*) msg_, flags_);
if (unlikely (rc < 0))
return -1;
return sz;
}
/* To be deprecated once zmq_msg_send() is stable */
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
return zmq_msg_send (msg_, s_, flags_);
}
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg, len_);
if (rc != 0)
return -1;
memcpy (zmq_msg_data (&msg), buf_, len_);
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
rc = s_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) {
int err = errno;
int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0);
errno = err;
return -1;
}
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
return rc;
}
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq_msg_t msg;
int rc = zmq_msg_init_data (&msg, (void*)buf_, len_, NULL, NULL);
if (rc != 0)
return -1;
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
rc = s_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) {
int err = errno;
int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0);
errno = err;
return -1;
}
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
return rc;
}
// Send multiple messages.
// TODO: this function has no man page
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
// ZMQ_SNDMORE bit switched off.
//
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
int rc = 0;
zmq_msg_t msg;
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
for (size_t i = 0; i < count_; ++i) {
rc = zmq_msg_init_size (&msg, a_[i].iov_len);
if (rc != 0) {
rc = -1;
break;
}
memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
if (i == count_ - 1)
flags_ = flags_ & ~ZMQ_SNDMORE;
rc = s_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) {
int err = errno;
int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0);
errno = err;
rc = -1;
break;
}
}
return rc;
}
// Receiving functions.
static int
s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
if (unlikely (rc < 0))
return -1;
return (int) zmq_msg_size (msg_);
}
/* To be deprecated once zmq_msg_recv() is stable */
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
return zmq_msg_recv (msg_, s_, flags_);
}
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int nbytes = s_recvmsg (s, &msg, flags_);
if (unlikely (nbytes < 0)) {
int err = errno;
rc = zmq_msg_close (&msg);
errno_assert (rc == 0);
errno = err;
return -1;
}
// At the moment an oversized message is silently truncated.
// TODO: Build in a notification mechanism to report the overflows.
size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
memcpy (buf_, zmq_msg_data (&msg), to_copy);
rc = zmq_msg_close (&msg);
errno_assert (rc == 0);
return nbytes;
}
// Receive a multi-part message
//
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
// The iov_base* buffers of each iovec *a_ filled in by this
// function may be freed using free().
// TODO: this function has no man page
//
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
size_t count = *count_;
int nread = 0;
bool recvmore = true;
*count_ = 0;
for (size_t i = 0; recvmore && i < count; ++i) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
int nbytes = s_recvmsg (s, &msg, flags_);
if (unlikely (nbytes < 0)) {
int err = errno;
rc = zmq_msg_close (&msg);
errno_assert (rc == 0);
errno = err;
nread = -1;
break;
}
a_[i].iov_len = zmq_msg_size (&msg);
a_[i].iov_base = static_cast<char *> (malloc(a_[i].iov_len));
if (unlikely (!a_[i].iov_base)) {
errno = ENOMEM;
return -1;
}
memcpy(a_[i].iov_base,static_cast<char *> (zmq_msg_data (&msg)),
a_[i].iov_len);
// Assume zmq_socket ZMQ_RVCMORE is properly set.
recvmore = ((zmq::msg_t*) (void *) &msg)->flags () & zmq::msg_t::more;
rc = zmq_msg_close(&msg);
errno_assert (rc == 0);
++*count_;
++nread;
}
return nread;
}
// Message manipulators.
int zmq_msg_init (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->init ();
}
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
return ((zmq::msg_t*) msg_)->init_size (size_);
}
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
zmq_free_fn *ffn_, void *hint_)
{
return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
}
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s_sendmsg (s, msg_, flags_);
return result;
}
int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s_recvmsg (s, msg_, flags_);
return result;
}
int zmq_msg_close (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->close ();
}
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
}
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
}
void *zmq_msg_data (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->data ();
}
size_t zmq_msg_size (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->size ();
}
int zmq_msg_more (zmq_msg_t *msg_)
{
return zmq_msg_get (msg_, ZMQ_MORE);
}
int zmq_msg_get (zmq_msg_t *msg_, int property_)
{
switch (property_) {
case ZMQ_MORE:
return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
case ZMQ_SRCFD:
return (int)((zmq::msg_t*) msg_)->fd ();
case ZMQ_SHARED:
return (((zmq::msg_t*) msg_)->is_cmsg ()) ||
(((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0;
default:
errno = EINVAL;
return -1;
}
}
int zmq_msg_set (zmq_msg_t *, int, int)
{
// No properties supported at present
errno = EINVAL;
return -1;
}
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
{
return ((zmq::msg_t*) msg_)->set_routing_id(routing_id_);
}
uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->get_routing_id();
}
// Get message metadata string
const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
{
zmq::metadata_t *metadata = ((zmq::msg_t*) msg_)->metadata ();
const char *value = NULL;
if (metadata)
value = metadata->get (std::string (property_));
if (value)
return value;
else {
errno = EINVAL;
return NULL;
}
}
// Polling.
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
}
if (unlikely (nitems_ == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
if (!items_) {
errno = EFAULT;
return -1;
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
pollfd spollfds[ZMQ_POLLITEMS_DFLT];
pollfd *pollfds = spollfds;
if (nitems_ > ZMQ_POLLITEMS_DFLT) {
pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
alloc_assert (pollfds);
}
// Build pollset for poll () system call.
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket, we poll on the file descriptor
// retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
size_t zmq_fd_size = sizeof (zmq::fd_t);
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
&zmq_fd_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
pollfds [i].events = items_ [i].events ? POLLIN : 0;
}
// Else, the poll item is a raw file descriptor. Just convert the
// events to normal POLLIN/POLLOUT for poll ().
else {
pollfds [i].fd = items_ [i].fd;
pollfds [i].events =
(items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
(items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
(items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0);
}
}
bool first_pass = true;
int nevents = 0;
while (true) {
// Compute the timeout for the subsequent poll.
int timeout;
if (first_pass)
timeout = 0;
else
if (timeout_ < 0)
timeout = -1;
else
timeout = end - now;
// Wait for events.
while (true) {
int rc = poll (pollfds, nitems_, timeout);
if (rc == -1 && errno == EINTR) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
errno_assert (rc >= 0);
break;
}
// Check for the events.
for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (pollfds [i].revents & POLLIN)
items_ [i].revents |= ZMQ_POLLIN;
if (pollfds [i].revents & POLLOUT)
items_ [i].revents |= ZMQ_POLLOUT;
if (pollfds [i].revents & POLLPRI)
items_ [i].revents |= ZMQ_POLLPRI;
if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
items_ [i].revents |= ZMQ_POLLERR;
}
if (items_ [i].revents)
nevents++;
}
// If timout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// If there are events to return, we can exit immediately.
if (nevents)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
if (pollfds != spollfds)
free (pollfds);
return nevents;
#elif defined ZMQ_POLL_BASED_ON_SELECT
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
}
if (unlikely (nitems_ == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert (nitems_ <= FD_SETSIZE);
fd_set pollset_in;
FD_ZERO (&pollset_in);
fd_set pollset_out;
FD_ZERO (&pollset_out);
fd_set pollset_err;
FD_ZERO (&pollset_err);
zmq::fd_t maxfd = 0;
// Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
size_t zmq_fd_size = sizeof (zmq::fd_t);
zmq::fd_t notify_fd;
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
&zmq_fd_size) == -1)
return -1;
if (items_ [i].events) {
FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd)
maxfd = notify_fd;
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// events to the appropriate fd_sets.
else {
if (items_ [i].events & ZMQ_POLLIN)
FD_SET (items_ [i].fd, &pollset_in);
if (items_ [i].events & ZMQ_POLLOUT)
FD_SET (items_ [i].fd, &pollset_out);
if (items_ [i].events & ZMQ_POLLERR)
FD_SET (items_ [i].fd, &pollset_err);
if (maxfd < items_ [i].fd)
maxfd = items_ [i].fd;
}
}
bool first_pass = true;
int nevents = 0;
fd_set inset, outset, errset;
while (true) {
// Compute the timeout for the subsequent poll.
timeval timeout;
timeval *ptimeout;
if (first_pass) {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
ptimeout = &timeout;
}
else
if (timeout_ < 0)
ptimeout = NULL;
else {
timeout.tv_sec = (long) ((end - now) / 1000);
timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
ptimeout = &timeout;
}
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
#if defined ZMQ_HAVE_WINDOWS
int rc = select (0, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == SOCKET_ERROR)) {
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
wsa_assert (errno == ENOTSOCK);
return -1;
}
#else
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF);
return -1;
}
#endif
break;
}
// Check for the events.
for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1)
return -1;
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLIN;
if (FD_ISSET (items_ [i].fd, &outset))
items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset))
items_ [i].revents |= ZMQ_POLLERR;
}
if (items_ [i].revents)
nevents++;
}
// If timout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// If there are events to return, we can exit immediately.
if (nevents)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
return nevents;
#else
// Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
return -1;
#endif
}
// The proxy functionality
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
{
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
}
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_);
}
int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
{
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
}
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_,
(zmq::socket_base_t*) control_);
}
// The deprecated device functionality
int zmq_device (int /* type */, void *frontend_, void *backend_)
{
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_, NULL);
}
// Probe library capabilities; for now, reports on transport and security
int zmq_has (const char *capability)
{
#if !defined (ZMQ_HAVE_WINDOWS) && !defined (ZMQ_HAVE_OPENVMS)
if (strcmp (capability, "ipc") == 0)
return true;
#endif
#if defined (ZMQ_HAVE_OPENPGM)
if (strcmp (capability, "pgm") == 0)
return true;
#endif
#if defined (ZMQ_HAVE_TIPC)
if (strcmp (capability, "tipc") == 0)
return true;
#endif
#if defined (ZMQ_HAVE_NORM)
if (strcmp (capability, "norm") == 0)
return true;
#endif
#if defined (HAVE_LIBSODIUM)
if (strcmp (capability, "curve") == 0)
return true;
#endif
#if defined (HAVE_LIBGSSAPI_KRB5)
if (strcmp (capability, "gssapi") == 0)
return true;
#endif
// Whatever the application asked for, we don't have
return false;
}