mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-15 02:07:59 +08:00
port zpoller to libzmq as zmq_poller
This commit is contained in:
parent
500269955d
commit
6501b8089f
1
.gitignore
vendored
1
.gitignore
vendored
@ -115,6 +115,7 @@ test_thread_safe
|
|||||||
test_thread_safe_polling
|
test_thread_safe_polling
|
||||||
test_getsockopt_memset
|
test_getsockopt_memset
|
||||||
test_stream_exceeds_buffer
|
test_stream_exceeds_buffer
|
||||||
|
test_poller
|
||||||
tests/test*.log
|
tests/test*.log
|
||||||
tests/test*.trs
|
tests/test*.trs
|
||||||
src/platform.hpp*
|
src/platform.hpp*
|
||||||
|
@ -440,6 +440,7 @@ set(cxx-sources
|
|||||||
zmq.cpp
|
zmq.cpp
|
||||||
zmq_utils.cpp
|
zmq_utils.cpp
|
||||||
decoder_allocators.cpp
|
decoder_allocators.cpp
|
||||||
|
socket_poller.cpp
|
||||||
config.hpp)
|
config.hpp)
|
||||||
|
|
||||||
set(rc-sources version.rc)
|
set(rc-sources version.rc)
|
||||||
|
10
Makefile.am
10
Makefile.am
@ -209,7 +209,9 @@ src_libzmq_la_SOURCES = \
|
|||||||
src/zmq.cpp \
|
src/zmq.cpp \
|
||||||
src/zmq_utils.cpp \
|
src/zmq_utils.cpp \
|
||||||
src/decoder_allocators.hpp \
|
src/decoder_allocators.hpp \
|
||||||
src/decoder_allocators.cpp
|
src/decoder_allocators.cpp \
|
||||||
|
src/socket_poller.hpp \
|
||||||
|
src/socket_poller.cpp
|
||||||
|
|
||||||
|
|
||||||
if ON_MINGW
|
if ON_MINGW
|
||||||
@ -365,7 +367,8 @@ test_apps = \
|
|||||||
tests/test_socketopt_hwm \
|
tests/test_socketopt_hwm \
|
||||||
tests/test_heartbeats \
|
tests/test_heartbeats \
|
||||||
tests/test_stream_exceeds_buffer \
|
tests/test_stream_exceeds_buffer \
|
||||||
tests/test_thread_safe_polling
|
tests/test_thread_safe_polling \
|
||||||
|
tests/test_poller
|
||||||
|
|
||||||
tests_test_system_SOURCES = tests/test_system.cpp
|
tests_test_system_SOURCES = tests/test_system.cpp
|
||||||
tests_test_system_LDADD = src/libzmq.la
|
tests_test_system_LDADD = src/libzmq.la
|
||||||
@ -573,6 +576,9 @@ tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la
|
|||||||
tests_test_thread_safe_polling_SOURCES = tests/test_thread_safe_polling.cpp
|
tests_test_thread_safe_polling_SOURCES = tests/test_thread_safe_polling.cpp
|
||||||
tests_test_thread_safe_polling_LDADD = src/libzmq.la
|
tests_test_thread_safe_polling_LDADD = src/libzmq.la
|
||||||
|
|
||||||
|
tests_test_poller_SOURCES = tests/test_poller.cpp
|
||||||
|
tests_test_poller_LDADD = src/libzmq.la
|
||||||
|
|
||||||
|
|
||||||
if !ON_MINGW
|
if !ON_MINGW
|
||||||
if !ON_CYGWIN
|
if !ON_CYGWIN
|
||||||
|
@ -427,6 +427,35 @@ ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p);
|
|||||||
ZMQ_EXPORT int zmq_pollfd_fd (void *p);
|
ZMQ_EXPORT int zmq_pollfd_fd (void *p);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/******************************************************************************/
|
||||||
|
/* Poller polling on sockets,fd and threaf safe sockets */
|
||||||
|
/******************************************************************************/
|
||||||
|
|
||||||
|
typedef struct zmq_poller_event_t
|
||||||
|
{
|
||||||
|
void *socket;
|
||||||
|
#if defined _WIN32
|
||||||
|
SOCKET fd;
|
||||||
|
#else
|
||||||
|
int fd;
|
||||||
|
#endif
|
||||||
|
void *user_data;
|
||||||
|
} zmq_poller_event_t;
|
||||||
|
|
||||||
|
ZMQ_EXPORT void *zmq_poller_new ();
|
||||||
|
ZMQ_EXPORT int zmq_poller_close (void *poller);
|
||||||
|
ZMQ_EXPORT int zmq_poller_add_socket (void *poller, void *socket, void *user_data);
|
||||||
|
ZMQ_EXPORT int zmq_poller_remove_socket (void *poller, void *socket);
|
||||||
|
ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
|
||||||
|
|
||||||
|
#if defined _WIN32
|
||||||
|
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data);
|
||||||
|
ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, SOCKET fd);
|
||||||
|
#else
|
||||||
|
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, int fd, void *user_data);
|
||||||
|
ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
|
||||||
|
#endif
|
||||||
|
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
/* Message proxying */
|
/* Message proxying */
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
|
224
src/socket_poller.cpp
Normal file
224
src/socket_poller.cpp
Normal file
@ -0,0 +1,224 @@
|
|||||||
|
/*
|
||||||
|
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
|
||||||
|
|
||||||
|
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||||
|
|
||||||
|
libzmq is free software; you can redistribute it and/or modify it under
|
||||||
|
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||||
|
by the Free Software Foundation; either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
As a special exception, the Contributors give you permission to link
|
||||||
|
this library with independent modules to produce an executable,
|
||||||
|
regardless of the license terms of these independent modules, and to
|
||||||
|
copy and distribute the resulting executable under terms of your choice,
|
||||||
|
provided that you also meet, for each linked independent module, the
|
||||||
|
terms and conditions of the license of that module. An independent
|
||||||
|
module is a module which is not derived from or based on this library.
|
||||||
|
If you modify this library, you must extend this exception to your
|
||||||
|
version of the library.
|
||||||
|
|
||||||
|
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||||
|
License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU Lesser General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "socket_poller.hpp"
|
||||||
|
#include "err.hpp"
|
||||||
|
|
||||||
|
zmq::socket_poller_t::socket_poller_t () :
|
||||||
|
tag (0xCAFEBABE),
|
||||||
|
poll_set (NULL),
|
||||||
|
poll_events (NULL)
|
||||||
|
{
|
||||||
|
pollfd = zmq_pollfd_new ();
|
||||||
|
}
|
||||||
|
|
||||||
|
zmq::socket_poller_t::~socket_poller_t ()
|
||||||
|
{
|
||||||
|
// Mark the socket_poller as dead
|
||||||
|
tag = 0xdeadbeef;
|
||||||
|
|
||||||
|
zmq_pollfd_close (pollfd);
|
||||||
|
|
||||||
|
if (poll_set) {
|
||||||
|
free (poll_set);
|
||||||
|
poll_set = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (poll_events) {
|
||||||
|
free (poll_events);
|
||||||
|
poll_events = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool zmq::socket_poller_t::check_tag ()
|
||||||
|
{
|
||||||
|
return tag == 0xCAFEBABE;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_)
|
||||||
|
{
|
||||||
|
for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
|
||||||
|
if (it->socket == socket_) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int thread_safe;
|
||||||
|
size_t thread_safe_size = sizeof(int);
|
||||||
|
|
||||||
|
if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
if (thread_safe) {
|
||||||
|
if (zmq_add_pollfd (socket_, pollfd) == -1)
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
event_t event = {socket_, 0, user_data_};
|
||||||
|
events.push_back (event);
|
||||||
|
need_rebuild = true;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined _WIN32
|
||||||
|
int zmq::socket_poller_t::add_fd (SOCKET fd_, void *user_data_)
|
||||||
|
#else
|
||||||
|
int zmq::socket_poller_t::add_fd (int fd_, void *user_data_)
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
|
||||||
|
if (!it->socket && it->fd == fd_) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
event_t event = {NULL, fd_, user_data_};
|
||||||
|
events.push_back (event);
|
||||||
|
need_rebuild = true;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq::socket_poller_t::remove_socket (void* socket_)
|
||||||
|
{
|
||||||
|
events_t::iterator it;
|
||||||
|
|
||||||
|
for (it = events.begin (); it != events.end (); ++it) {
|
||||||
|
if (it->socket == socket_)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (it == events.end()) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int thread_safe;
|
||||||
|
size_t thread_safe_size = sizeof(int);
|
||||||
|
|
||||||
|
if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
if (thread_safe) {
|
||||||
|
if (zmq_remove_pollfd (socket_, pollfd) == -1)
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
events.erase (it);
|
||||||
|
need_rebuild = true;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined _WIN32
|
||||||
|
int zmq::socket_poller_t::remove_fd (SOCKET fd_)
|
||||||
|
#else
|
||||||
|
int zmq::socket_poller_t::remove_fd (int fd_)
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
events_t::iterator it;
|
||||||
|
|
||||||
|
for (it = events.begin (); it != events.end (); ++it) {
|
||||||
|
if (!it->socket && it->fd == fd_)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (it == events.end()) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
events.erase (it);
|
||||||
|
need_rebuild = true;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_)
|
||||||
|
{
|
||||||
|
if (need_rebuild)
|
||||||
|
rebuild ();
|
||||||
|
|
||||||
|
int rc = zmq_pollfd_poll (pollfd, poll_set, poll_size, timeout_);
|
||||||
|
|
||||||
|
if (rc == -1) {
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rc == 0) {
|
||||||
|
errno = EAGAIN;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < poll_size; i++) {
|
||||||
|
if (poll_set [i].revents & ZMQ_POLLIN) {
|
||||||
|
*event_ = poll_events[i];
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void zmq::socket_poller_t::rebuild ()
|
||||||
|
{
|
||||||
|
if (poll_set) {
|
||||||
|
free (poll_set);
|
||||||
|
poll_set = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (poll_events) {
|
||||||
|
free (poll_events);
|
||||||
|
poll_events = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
poll_size = events.size ();
|
||||||
|
|
||||||
|
poll_set = (zmq_pollitem_t*) malloc (poll_size * sizeof (zmq_pollitem_t));
|
||||||
|
alloc_assert (poll_set);
|
||||||
|
|
||||||
|
poll_events = (event_t*) malloc (poll_size * sizeof (event_t));
|
||||||
|
|
||||||
|
int event_nbr = 0;
|
||||||
|
for (events_t::iterator it = events.begin (); it != events.end (); ++it, event_nbr++) {
|
||||||
|
poll_set [event_nbr].socket = it->socket;
|
||||||
|
|
||||||
|
if (!it->socket)
|
||||||
|
poll_set [event_nbr].fd = it->fd;
|
||||||
|
|
||||||
|
poll_set [event_nbr].events = ZMQ_POLLIN;
|
||||||
|
poll_events [event_nbr] = *it;
|
||||||
|
}
|
||||||
|
|
||||||
|
need_rebuild = false;
|
||||||
|
}
|
104
src/socket_poller.hpp
Normal file
104
src/socket_poller.hpp
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
/*
|
||||||
|
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
|
||||||
|
|
||||||
|
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||||
|
|
||||||
|
libzmq is free software; you can redistribute it and/or modify it under
|
||||||
|
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||||
|
by the Free Software Foundation; either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
As a special exception, the Contributors give you permission to link
|
||||||
|
this library with independent modules to produce an executable,
|
||||||
|
regardless of the license terms of these independent modules, and to
|
||||||
|
copy and distribute the resulting executable under terms of your choice,
|
||||||
|
provided that you also meet, for each linked independent module, the
|
||||||
|
terms and conditions of the license of that module. An independent
|
||||||
|
module is a module which is not derived from or based on this library.
|
||||||
|
If you modify this library, you must extend this exception to your
|
||||||
|
version of the library.
|
||||||
|
|
||||||
|
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||||
|
License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU Lesser General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef __ZMQ_SOCKET_POLLER_HPP_INCLUDED__
|
||||||
|
#define __ZMQ_SOCKET_POLLER_HPP_INCLUDED__
|
||||||
|
|
||||||
|
#include <vector>
|
||||||
|
#include <algorithm>
|
||||||
|
|
||||||
|
#include "../include/zmq.h"
|
||||||
|
|
||||||
|
namespace zmq
|
||||||
|
{
|
||||||
|
|
||||||
|
class socket_poller_t
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
socket_poller_t ();
|
||||||
|
~socket_poller_t ();
|
||||||
|
|
||||||
|
typedef struct event_t
|
||||||
|
{
|
||||||
|
void *socket;
|
||||||
|
#if defined _WIN32
|
||||||
|
SOCKET fd;
|
||||||
|
#else
|
||||||
|
int fd;
|
||||||
|
#endif
|
||||||
|
void *user_data;
|
||||||
|
} event_t;
|
||||||
|
|
||||||
|
int add_socket (void *socket, void *user_data);
|
||||||
|
int remove_socket (void *socket);
|
||||||
|
#if defined _WIN32
|
||||||
|
int add_fd (SOCKET fd, void *user_data);
|
||||||
|
int remove_fd (SOCKET fd);
|
||||||
|
#else
|
||||||
|
int add_fd (int fd, void *user_data);
|
||||||
|
int remove_fd (int fd);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int wait (event_t *event, long timeout);
|
||||||
|
|
||||||
|
// Return false if object is not a socket.
|
||||||
|
bool check_tag ();
|
||||||
|
|
||||||
|
private:
|
||||||
|
void rebuild ();
|
||||||
|
|
||||||
|
// Used to check whether the object is a socket_poller.
|
||||||
|
uint32_t tag;
|
||||||
|
|
||||||
|
// Pollfd used for thread safe sockets polling
|
||||||
|
void *pollfd;
|
||||||
|
|
||||||
|
// List of sockets
|
||||||
|
typedef std::vector <event_t> events_t;
|
||||||
|
events_t events;
|
||||||
|
|
||||||
|
// Current zmq_poll set
|
||||||
|
zmq_pollitem_t *poll_set;
|
||||||
|
|
||||||
|
// Matching set to events
|
||||||
|
event_t *poll_events;
|
||||||
|
|
||||||
|
// Size of the pollset
|
||||||
|
int poll_size;
|
||||||
|
|
||||||
|
// Does the pollset needs rebuilding?
|
||||||
|
bool need_rebuild;
|
||||||
|
|
||||||
|
socket_poller_t (const socket_poller_t&);
|
||||||
|
const socket_poller_t &operator = (const socket_poller_t&);
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
88
src/zmq.cpp
88
src/zmq.cpp
@ -76,6 +76,7 @@ struct iovec {
|
|||||||
#include "fd.hpp"
|
#include "fd.hpp"
|
||||||
#include "metadata.hpp"
|
#include "metadata.hpp"
|
||||||
#include "signaler.hpp"
|
#include "signaler.hpp"
|
||||||
|
#include "socket_poller.hpp"
|
||||||
|
|
||||||
#if !defined ZMQ_HAVE_WINDOWS
|
#if !defined ZMQ_HAVE_WINDOWS
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
@ -1558,6 +1559,93 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The poller functionality
|
||||||
|
|
||||||
|
void* zmq_poller_new ()
|
||||||
|
{
|
||||||
|
zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
|
||||||
|
alloc_assert (poller);
|
||||||
|
return poller;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq_poller_close (void *poller_)
|
||||||
|
{
|
||||||
|
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
delete ((zmq::socket_poller_t*)poller_);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq_poller_add_socket (void *poller_, void *socket_, void *user_data_)
|
||||||
|
{
|
||||||
|
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ((zmq::socket_poller_t*)poller_)->add_socket (socket_, user_data_);
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined _WIN32
|
||||||
|
int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_)
|
||||||
|
#else
|
||||||
|
int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_)
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_);
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq_poller_remove_socket (void *poller_, void *socket)
|
||||||
|
{
|
||||||
|
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ((zmq::socket_poller_t*)poller_)->remove_socket (socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined _WIN32
|
||||||
|
int zmq_poller_remove_fd (void *poller_, SOCKET fd_)
|
||||||
|
#else
|
||||||
|
int zmq_poller_remove_fd (void *poller_, int fd_)
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ((zmq::socket_poller_t*)poller_)->remove_fd (fd_);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
|
||||||
|
{
|
||||||
|
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
zmq::socket_poller_t::event_t e = {};
|
||||||
|
|
||||||
|
int rc = ((zmq::socket_poller_t*)poller_)->wait (&e, timeout_);
|
||||||
|
|
||||||
|
event->socket = e.socket;
|
||||||
|
event->fd = e.fd;
|
||||||
|
event->user_data = e.user_data;
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
// The proxy functionality
|
// The proxy functionality
|
||||||
|
|
||||||
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
|
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
|
||||||
|
@ -53,6 +53,7 @@ set(tests
|
|||||||
test_sockopt_hwm
|
test_sockopt_hwm
|
||||||
test_heartbeats
|
test_heartbeats
|
||||||
test_thread_safe_polling
|
test_thread_safe_polling
|
||||||
|
test_poller
|
||||||
)
|
)
|
||||||
if(NOT WIN32)
|
if(NOT WIN32)
|
||||||
list(APPEND tests
|
list(APPEND tests
|
||||||
|
138
tests/test_poller.cpp
Normal file
138
tests/test_poller.cpp
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
/*
|
||||||
|
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
|
||||||
|
|
||||||
|
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||||
|
|
||||||
|
libzmq is free software; you can redistribute it and/or modify it under
|
||||||
|
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||||
|
by the Free Software Foundation; either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
As a special exception, the Contributors give you permission to link
|
||||||
|
this library with independent modules to produce an executable,
|
||||||
|
regardless of the license terms of these independent modules, and to
|
||||||
|
copy and distribute the resulting executable under terms of your choice,
|
||||||
|
provided that you also meet, for each linked independent module, the
|
||||||
|
terms and conditions of the license of that module. An independent
|
||||||
|
module is a module which is not derived from or based on this library.
|
||||||
|
If you modify this library, you must extend this exception to your
|
||||||
|
version of the library.
|
||||||
|
|
||||||
|
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||||
|
License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU Lesser General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "testutil.hpp"
|
||||||
|
#include "../include/zmq_utils.h"
|
||||||
|
|
||||||
|
int main (void)
|
||||||
|
{
|
||||||
|
setup_test_environment ();
|
||||||
|
|
||||||
|
void *ctx = zmq_ctx_new ();
|
||||||
|
assert (ctx);
|
||||||
|
|
||||||
|
// Create few sockets
|
||||||
|
void *vent = zmq_socket (ctx, ZMQ_PUSH);
|
||||||
|
assert (vent);
|
||||||
|
int rc = zmq_bind (vent, "tcp://127.0.0.1:55556");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
void *sink = zmq_socket (ctx, ZMQ_PULL);
|
||||||
|
assert (sink);
|
||||||
|
rc = zmq_connect (sink, "tcp://127.0.0.1:55556");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
void *bowl = zmq_socket (ctx, ZMQ_PULL);
|
||||||
|
assert (bowl);
|
||||||
|
|
||||||
|
void *server = zmq_socket (ctx, ZMQ_SERVER);
|
||||||
|
assert (server);
|
||||||
|
rc = zmq_bind (server, "tcp://127.0.0.1:55557");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
void *client = zmq_socket (ctx, ZMQ_CLIENT);
|
||||||
|
assert (client);
|
||||||
|
|
||||||
|
// Set up poller
|
||||||
|
void* poller = zmq_poller_new ();
|
||||||
|
rc = zmq_poller_add_socket (poller, sink, sink);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Send a message
|
||||||
|
char data[1] = {'H'};
|
||||||
|
rc = zmq_send_const (vent, data, 1, 0);
|
||||||
|
assert (rc == 1);
|
||||||
|
|
||||||
|
// We expect a message only on the sink
|
||||||
|
zmq_poller_event_t event;
|
||||||
|
rc = zmq_poller_wait (poller, &event, -1);
|
||||||
|
assert (rc == 0);
|
||||||
|
assert (event.socket == sink);
|
||||||
|
assert (event.user_data == sink);
|
||||||
|
rc = zmq_recv (sink, data, 1, 0);
|
||||||
|
assert (rc == 1);
|
||||||
|
|
||||||
|
// Stop polling sink
|
||||||
|
rc = zmq_poller_remove_socket (poller, sink);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Check we can poll an FD
|
||||||
|
rc = zmq_connect (bowl, "tcp://127.0.0.1:55556");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
#if defined _WIN32
|
||||||
|
SOCKET fd;
|
||||||
|
size_t fd_size = sizeof (SOCKET);
|
||||||
|
#else
|
||||||
|
int fd;
|
||||||
|
size_t fd_size = sizeof (int);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
rc = zmq_getsockopt (bowl, ZMQ_FD, &fd, &fd_size);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_poller_add_fd (poller, fd, bowl);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_poller_wait (poller, &event, 500);
|
||||||
|
assert (rc == 0);
|
||||||
|
assert (event.socket == NULL);
|
||||||
|
assert (event.fd == fd);
|
||||||
|
assert (event.user_data == bowl);
|
||||||
|
zmq_poller_remove_fd (poller, fd);
|
||||||
|
|
||||||
|
// Polling on thread safe sockets
|
||||||
|
zmq_poller_add_socket (poller, server, NULL);
|
||||||
|
rc = zmq_connect (client, "tcp://127.0.0.1:55557");
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_send_const (client, data, 1, 0);
|
||||||
|
assert (rc == 1);
|
||||||
|
rc = zmq_poller_wait (poller, &event, 500);
|
||||||
|
assert (rc == 0);
|
||||||
|
assert (event.socket == server);
|
||||||
|
assert (event.user_data == NULL);
|
||||||
|
rc = zmq_recv (server, data, 1, 0);
|
||||||
|
assert (rc == 1);
|
||||||
|
|
||||||
|
// Destory poller, sockets and ctx
|
||||||
|
rc = zmq_poller_close (poller);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_close (sink);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_close (vent);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_close (bowl);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_close (server);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_close (client);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_ctx_shutdown (ctx);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user