diff --git a/.gitignore b/.gitignore index 18ca3f10..b9895c4a 100644 --- a/.gitignore +++ b/.gitignore @@ -115,6 +115,7 @@ test_thread_safe test_thread_safe_polling test_getsockopt_memset test_stream_exceeds_buffer +test_poller tests/test*.log tests/test*.trs src/platform.hpp* diff --git a/CMakeLists.txt b/CMakeLists.txt index cd6dd809..e5cfa968 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -440,6 +440,7 @@ set(cxx-sources zmq.cpp zmq_utils.cpp decoder_allocators.cpp + socket_poller.cpp config.hpp) set(rc-sources version.rc) diff --git a/Makefile.am b/Makefile.am index e61903d1..51a91602 100644 --- a/Makefile.am +++ b/Makefile.am @@ -209,7 +209,9 @@ src_libzmq_la_SOURCES = \ src/zmq.cpp \ src/zmq_utils.cpp \ src/decoder_allocators.hpp \ - src/decoder_allocators.cpp + src/decoder_allocators.cpp \ + src/socket_poller.hpp \ + src/socket_poller.cpp if ON_MINGW @@ -365,7 +367,8 @@ test_apps = \ tests/test_socketopt_hwm \ tests/test_heartbeats \ tests/test_stream_exceeds_buffer \ - tests/test_thread_safe_polling + tests/test_thread_safe_polling \ + tests/test_poller tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_LDADD = src/libzmq.la @@ -573,6 +576,9 @@ tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la tests_test_thread_safe_polling_SOURCES = tests/test_thread_safe_polling.cpp tests_test_thread_safe_polling_LDADD = src/libzmq.la +tests_test_poller_SOURCES = tests/test_poller.cpp +tests_test_poller_LDADD = src/libzmq.la + if !ON_MINGW if !ON_CYGWIN diff --git a/include/zmq.h b/include/zmq.h index 62479163..89e5c587 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -427,6 +427,35 @@ ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p); ZMQ_EXPORT int zmq_pollfd_fd (void *p); #endif +/******************************************************************************/ +/* Poller polling on sockets,fd and threaf safe sockets */ +/******************************************************************************/ + +typedef struct zmq_poller_event_t +{ + void *socket; +#if defined _WIN32 + SOCKET fd; +#else + int fd; +#endif + void *user_data; +} zmq_poller_event_t; + +ZMQ_EXPORT void *zmq_poller_new (); +ZMQ_EXPORT int zmq_poller_close (void *poller); +ZMQ_EXPORT int zmq_poller_add_socket (void *poller, void *socket, void *user_data); +ZMQ_EXPORT int zmq_poller_remove_socket (void *poller, void *socket); +ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout); + +#if defined _WIN32 +ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data); +ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, SOCKET fd); +#else +ZMQ_EXPORT int zmq_poller_add_fd (void *poller, int fd, void *user_data); +ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd); +#endif + /******************************************************************************/ /* Message proxying */ /******************************************************************************/ diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp new file mode 100644 index 00000000..189430ff --- /dev/null +++ b/src/socket_poller.cpp @@ -0,0 +1,234 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "socket_poller.hpp" +#include "err.hpp" + +zmq::socket_poller_t::socket_poller_t () : + tag (0xCAFEBABE), + poll_set (NULL), + poll_events (NULL) +{ + pollfd = zmq_pollfd_new (); +} + +zmq::socket_poller_t::~socket_poller_t () +{ + // Mark the socket_poller as dead + tag = 0xdeadbeef; + + for (events_t::iterator it = events.begin(); it != events.end(); ++it) { + if (it->socket) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (zmq_getsockopt(it->socket, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe) + zmq_remove_pollfd(it->socket, pollfd); + } + } + + zmq_pollfd_close (pollfd); + + if (poll_set) { + free (poll_set); + poll_set = NULL; + } + + if (poll_events) { + free (poll_events); + poll_events = NULL; + } +} + +bool zmq::socket_poller_t::check_tag () +{ + return tag == 0xCAFEBABE; +} + +int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_) +{ + for (events_t::iterator it = events.begin (); it != events.end (); ++it) { + if (it->socket == socket_) { + errno = EINVAL; + return -1; + } + } + + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) + return -1; + + if (thread_safe) { + if (zmq_add_pollfd (socket_, pollfd) == -1) + return -1; + } + + event_t event = {socket_, 0, user_data_}; + events.push_back (event); + need_rebuild = true; + + return 0; +} + +#if defined _WIN32 +int zmq::socket_poller_t::add_fd (SOCKET fd_, void *user_data_) +#else +int zmq::socket_poller_t::add_fd (int fd_, void *user_data_) +#endif +{ + for (events_t::iterator it = events.begin (); it != events.end (); ++it) { + if (!it->socket && it->fd == fd_) { + errno = EINVAL; + return -1; + } + } + + event_t event = {NULL, fd_, user_data_}; + events.push_back (event); + need_rebuild = true; + + return 0; +} + +int zmq::socket_poller_t::remove_socket (void* socket_) +{ + events_t::iterator it; + + for (it = events.begin (); it != events.end (); ++it) { + if (it->socket == socket_) + break; + } + + if (it == events.end()) { + errno = EINVAL; + return -1; + } + + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) + return -1; + + if (thread_safe) { + if (zmq_remove_pollfd (socket_, pollfd) == -1) + return -1; + } + + events.erase (it); + need_rebuild = true; + + return 0; +} + +#if defined _WIN32 +int zmq::socket_poller_t::remove_fd (SOCKET fd_) +#else +int zmq::socket_poller_t::remove_fd (int fd_) +#endif +{ + events_t::iterator it; + + for (it = events.begin (); it != events.end (); ++it) { + if (!it->socket && it->fd == fd_) + break; + } + + if (it == events.end()) { + errno = EINVAL; + return -1; + } + + events.erase (it); + need_rebuild = true; + + return 0; +} + +int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_) +{ + if (need_rebuild) + rebuild (); + + int rc = zmq_pollfd_poll (pollfd, poll_set, poll_size, timeout_); + + if (rc == -1) { + return rc; + } + + if (rc == 0) { + errno = EAGAIN; + return -1; + } + + for (int i = 0; i < poll_size; i++) { + if (poll_set [i].revents & ZMQ_POLLIN) { + *event_ = poll_events[i]; + + break; + } + } + + return 0; +} + +void zmq::socket_poller_t::rebuild () +{ + if (poll_set) { + free (poll_set); + poll_set = NULL; + } + + if (poll_events) { + free (poll_events); + poll_events = NULL; + } + + poll_size = events.size (); + + poll_set = (zmq_pollitem_t*) malloc (poll_size * sizeof (zmq_pollitem_t)); + alloc_assert (poll_set); + + poll_events = (event_t*) malloc (poll_size * sizeof (event_t)); + + int event_nbr = 0; + for (events_t::iterator it = events.begin (); it != events.end (); ++it, event_nbr++) { + poll_set [event_nbr].socket = it->socket; + + if (!it->socket) + poll_set [event_nbr].fd = it->fd; + + poll_set [event_nbr].events = ZMQ_POLLIN; + poll_events [event_nbr] = *it; + } + + need_rebuild = false; +} diff --git a/src/socket_poller.hpp b/src/socket_poller.hpp new file mode 100644 index 00000000..5bdc4353 --- /dev/null +++ b/src/socket_poller.hpp @@ -0,0 +1,104 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SOCKET_POLLER_HPP_INCLUDED__ +#define __ZMQ_SOCKET_POLLER_HPP_INCLUDED__ + +#include +#include + +#include "../include/zmq.h" + +namespace zmq +{ + + class socket_poller_t + { + public: + socket_poller_t (); + ~socket_poller_t (); + + typedef struct event_t + { + void *socket; +#if defined _WIN32 + SOCKET fd; +#else + int fd; +#endif + void *user_data; + } event_t; + + int add_socket (void *socket, void *user_data); + int remove_socket (void *socket); +#if defined _WIN32 + int add_fd (SOCKET fd, void *user_data); + int remove_fd (SOCKET fd); +#else + int add_fd (int fd, void *user_data); + int remove_fd (int fd); +#endif + + int wait (event_t *event, long timeout); + + // Return false if object is not a socket. + bool check_tag (); + + private: + void rebuild (); + + // Used to check whether the object is a socket_poller. + uint32_t tag; + + // Pollfd used for thread safe sockets polling + void *pollfd; + + // List of sockets + typedef std::vector events_t; + events_t events; + + // Current zmq_poll set + zmq_pollitem_t *poll_set; + + // Matching set to events + event_t *poll_events; + + // Size of the pollset + int poll_size; + + // Does the pollset needs rebuilding? + bool need_rebuild; + + socket_poller_t (const socket_poller_t&); + const socket_poller_t &operator = (const socket_poller_t&); + }; + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index ea06eee2..6664c77a 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -76,6 +76,7 @@ struct iovec { #include "fd.hpp" #include "metadata.hpp" #include "signaler.hpp" +#include "socket_poller.hpp" #if !defined ZMQ_HAVE_WINDOWS #include @@ -1558,6 +1559,93 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout #endif } +// The poller functionality + +void* zmq_poller_new () +{ + zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t; + alloc_assert (poller); + return poller; +} + +int zmq_poller_close (void *poller_) +{ + if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + delete ((zmq::socket_poller_t*)poller_); + return 0; +} + +int zmq_poller_add_socket (void *poller_, void *socket_, void *user_data_) +{ + if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::socket_poller_t*)poller_)->add_socket (socket_, user_data_); +} + +#if defined _WIN32 +int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_) +#else +int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_) +#endif +{ + if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_); +} + +int zmq_poller_remove_socket (void *poller_, void *socket) +{ + if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::socket_poller_t*)poller_)->remove_socket (socket); +} + +#if defined _WIN32 +int zmq_poller_remove_fd (void *poller_, SOCKET fd_) +#else +int zmq_poller_remove_fd (void *poller_, int fd_) +#endif +{ + if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::socket_poller_t*)poller_)->remove_fd (fd_); +} + + +int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_) +{ + if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + zmq::socket_poller_t::event_t e = {}; + + int rc = ((zmq::socket_poller_t*)poller_)->wait (&e, timeout_); + + event->socket = e.socket; + event->fd = e.fd; + event->user_data = e.user_data; + + return rc; +} + // The proxy functionality int zmq_proxy (void *frontend_, void *backend_, void *capture_) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0197c084..ddc6d78b 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -53,6 +53,7 @@ set(tests test_sockopt_hwm test_heartbeats test_thread_safe_polling + test_poller ) if(NOT WIN32) list(APPEND tests diff --git a/tests/test_poller.cpp b/tests/test_poller.cpp new file mode 100644 index 00000000..9a9f9415 --- /dev/null +++ b/tests/test_poller.cpp @@ -0,0 +1,138 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "../include/zmq_utils.h" + +int main (void) +{ + setup_test_environment (); + + void *ctx = zmq_ctx_new (); + assert (ctx); + + // Create few sockets + void *vent = zmq_socket (ctx, ZMQ_PUSH); + assert (vent); + int rc = zmq_bind (vent, "tcp://127.0.0.1:55556"); + assert (rc == 0); + + void *sink = zmq_socket (ctx, ZMQ_PULL); + assert (sink); + rc = zmq_connect (sink, "tcp://127.0.0.1:55556"); + assert (rc == 0); + + void *bowl = zmq_socket (ctx, ZMQ_PULL); + assert (bowl); + + void *server = zmq_socket (ctx, ZMQ_SERVER); + assert (server); + rc = zmq_bind (server, "tcp://127.0.0.1:55557"); + assert (rc == 0); + + void *client = zmq_socket (ctx, ZMQ_CLIENT); + assert (client); + + // Set up poller + void* poller = zmq_poller_new (); + rc = zmq_poller_add_socket (poller, sink, sink); + assert (rc == 0); + + // Send a message + char data[1] = {'H'}; + rc = zmq_send_const (vent, data, 1, 0); + assert (rc == 1); + + // We expect a message only on the sink + zmq_poller_event_t event; + rc = zmq_poller_wait (poller, &event, -1); + assert (rc == 0); + assert (event.socket == sink); + assert (event.user_data == sink); + rc = zmq_recv (sink, data, 1, 0); + assert (rc == 1); + + // Stop polling sink + rc = zmq_poller_remove_socket (poller, sink); + assert (rc == 0); + + // Check we can poll an FD + rc = zmq_connect (bowl, "tcp://127.0.0.1:55556"); + assert (rc == 0); + +#if defined _WIN32 + SOCKET fd; + size_t fd_size = sizeof (SOCKET); +#else + int fd; + size_t fd_size = sizeof (int); +#endif + + rc = zmq_getsockopt (bowl, ZMQ_FD, &fd, &fd_size); + assert (rc == 0); + rc = zmq_poller_add_fd (poller, fd, bowl); + assert (rc == 0); + rc = zmq_poller_wait (poller, &event, 500); + assert (rc == 0); + assert (event.socket == NULL); + assert (event.fd == fd); + assert (event.user_data == bowl); + zmq_poller_remove_fd (poller, fd); + + // Polling on thread safe sockets + zmq_poller_add_socket (poller, server, NULL); + rc = zmq_connect (client, "tcp://127.0.0.1:55557"); + assert (rc == 0); + rc = zmq_send_const (client, data, 1, 0); + assert (rc == 1); + rc = zmq_poller_wait (poller, &event, 500); + assert (rc == 0); + assert (event.socket == server); + assert (event.user_data == NULL); + rc = zmq_recv (server, data, 1, 0); + assert (rc == 1); + + // Destory poller, sockets and ctx + rc = zmq_poller_close (poller); + assert (rc == 0); + rc = zmq_close (sink); + assert (rc == 0); + rc = zmq_close (vent); + assert (rc == 0); + rc = zmq_close (bowl); + assert (rc == 0); + rc = zmq_close (server); + assert (rc == 0); + rc = zmq_close (client); + assert (rc == 0); + rc = zmq_ctx_shutdown (ctx); + assert (rc == 0); + + return 0; +}