From 8d7bf6684cbb9625ec7c963b8867e2411b49eb57 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 26 Sep 2010 19:22:33 +0200 Subject: [PATCH] common base for all pollers created; the only thing it handles at the moment is 'load' --- src/Makefile.am | 2 ++ src/devpoll.cpp | 13 ++--------- src/devpoll.hpp | 9 ++------ src/epoll.cpp | 12 ++-------- src/epoll.hpp | 9 ++------ src/kqueue.cpp | 15 ++++-------- src/kqueue.hpp | 9 ++------ src/poll.cpp | 12 ++-------- src/poll.hpp | 9 ++------ src/poller_base.cpp | 44 +++++++++++++++++++++++++++++++++++ src/poller_base.hpp | 56 +++++++++++++++++++++++++++++++++++++++++++++ src/select.cpp | 12 ++-------- src/select.hpp | 9 ++------ 13 files changed, 125 insertions(+), 86 deletions(-) create mode 100644 src/poller_base.cpp create mode 100644 src/poller_base.hpp diff --git a/src/Makefile.am b/src/Makefile.am index d2cc1746..89fc44ac 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -89,6 +89,7 @@ libzmq_la_SOURCES = \ platform.hpp \ poll.hpp \ poller.hpp \ + poller_base.hpp \ pair.hpp \ pub.hpp \ pull.hpp \ @@ -148,6 +149,7 @@ libzmq_la_SOURCES = \ pgm_socket.cpp \ pipe.cpp \ poll.cpp \ + poller_base.cpp \ pull.cpp \ push.cpp \ pub.cpp \ diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 7054c2bc..32aca505 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -56,10 +56,6 @@ zmq::devpoll_t::devpoll_t () : zmq::devpoll_t::~devpoll_t () { worker.stop (); - - // Make sure there are no fds registered on shutdown. - zmq_assert (load.get () == 0); - close (devpoll_fd); } @@ -84,7 +80,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, pending_list.push_back (fd_); // Increase the load metric of the thread. - load.add (1); + adjust_load (1); return fd_; } @@ -97,7 +93,7 @@ void zmq::devpoll_t::rm_fd (handle_t handle_) fd_table [handle_].valid = false; // Decrease the load metric of the thread. - load.sub (1); + adjust_load (-1); } void zmq::devpoll_t::set_pollin (handle_t handle_) @@ -140,11 +136,6 @@ void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_) timers.erase (it); } -int zmq::devpoll_t::get_load () -{ - return load.get (); -} - void zmq::devpoll_t::start () { worker.start (worker_routine, this); diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 00be3851..ae165831 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -28,14 +28,14 @@ #include "fd.hpp" #include "thread.hpp" -#include "atomic_counter.hpp" +#include "poller_base.hpp" namespace zmq { // Implements socket polling mechanism using the "/dev/poll" interface. - class devpoll_t + class devpoll_t : public poller_base_t { public: @@ -53,7 +53,6 @@ namespace zmq void reset_pollout (handle_t handle_); void add_timer (int timeout_, struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_); - int get_load (); void start (); void stop (); @@ -94,10 +93,6 @@ namespace zmq // Handle of the physical thread doing the I/O work. thread_t worker; - // Load of the poller. Currently number of file descriptors - // registered with the poller. - atomic_counter_t load; - devpoll_t (const devpoll_t&); void operator = (const devpoll_t&); }; diff --git a/src/epoll.cpp b/src/epoll.cpp index bbad6399..584a13f2 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -45,9 +45,6 @@ zmq::epoll_t::~epoll_t () // Wait till the worker thread exits. worker.stop (); - // Make sure there are no fds registered on shutdown. - zmq_assert (load.get () == 0); - close (epoll_fd); for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++) delete *it; @@ -71,7 +68,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) errno_assert (rc != -1); // Increase the load metric of the thread. - load.add (1); + adjust_load (1); return pe; } @@ -85,7 +82,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_) retired.push_back (pe); // Decrease the load metric of the thread. - load.sub (1); + adjust_load (-1); } void zmq::epoll_t::set_pollin (handle_t handle_) @@ -133,11 +130,6 @@ void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_) timers.erase (it); } -int zmq::epoll_t::get_load () -{ - return load.get (); -} - void zmq::epoll_t::start () { worker.start (worker_routine, this); diff --git a/src/epoll.hpp b/src/epoll.hpp index a68f0552..a19fc0d5 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -29,7 +29,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "atomic_counter.hpp" +#include "poller_base.hpp" namespace zmq { @@ -37,7 +37,7 @@ namespace zmq // This class implements socket polling mechanism using the Linux-specific // epoll mechanism. - class epoll_t + class epoll_t : public poller_base_t { public: @@ -55,7 +55,6 @@ namespace zmq void reset_pollout (handle_t handle_); void add_timer (int timeout_, struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_); - int get_load (); void start (); void stop (); @@ -91,10 +90,6 @@ namespace zmq // Handle of the physical thread doing the I/O work. thread_t worker; - // Load of the poller. Currently number of file descriptors - // registered with the poller. - atomic_counter_t load; - epoll_t (const epoll_t&); void operator = (const epoll_t&); }; diff --git a/src/kqueue.cpp b/src/kqueue.cpp index f76a08f1..47178d37 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -54,10 +54,6 @@ zmq::kqueue_t::kqueue_t () : zmq::kqueue_t::~kqueue_t () { worker.stop (); - - // Make sure there are no fds registered on shutdown. - zmq_assert (load.get () == 0); - close (kqueue_fd); } @@ -74,7 +70,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) { struct kevent ev; - EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, (kevent_udata_t)NULL); + EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, (kevent_udata_t) NULL); int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); errno_assert (rc != -1); } @@ -90,6 +86,8 @@ zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, pe->flag_pollout = 0; pe->reactor = reactor_; + adjust_load (1); + return pe; } @@ -102,6 +100,8 @@ void zmq::kqueue_t::rm_fd (handle_t handle_) kevent_delete (pe->fd, EVFILT_WRITE); pe->fd = retired_fd; retired.push_back (pe); + + adjust_load (-1); } void zmq::kqueue_t::set_pollin (handle_t handle_) @@ -144,11 +144,6 @@ void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_) timers.erase (it); } -int zmq::kqueue_t::get_load () -{ - return load.get (); -} - void zmq::kqueue_t::start () { worker.start (worker_routine, this); diff --git a/src/kqueue.hpp b/src/kqueue.hpp index 43c2a39e..6a27260b 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -29,7 +29,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "atomic_counter.hpp" +#include "poller_base.hpp" namespace zmq { @@ -37,7 +37,7 @@ namespace zmq // Implements socket polling mechanism using the BSD-specific // kqueue interface. - class kqueue_t + class kqueue_t : public poller_base_t { public: @@ -55,7 +55,6 @@ namespace zmq void reset_pollout (handle_t handle_); void add_timer (int timeout_, struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_); - int get_load (); void start (); void stop (); @@ -98,10 +97,6 @@ namespace zmq // Handle of the physical thread doing the I/O work. thread_t worker; - // Load of the poller. Currently number of file descriptors - // registered with the poller. - atomic_counter_t load; - kqueue_t (const kqueue_t&); void operator = (const kqueue_t&); }; diff --git a/src/poll.cpp b/src/poll.cpp index 513c4050..9afa6b5b 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -54,9 +54,6 @@ zmq::poll_t::poll_t () : zmq::poll_t::~poll_t () { worker.stop (); - - // Make sure there are no fds registered on shutdown. - zmq_assert (load.get () == 0); } zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) @@ -69,7 +66,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) fd_table [fd_].events = events_; // Increase the load metric of the thread. - load.add (1); + adjust_load (1); return fd_; } @@ -85,7 +82,7 @@ void zmq::poll_t::rm_fd (handle_t handle_) retired = true; // Decrease the load metric of the thread. - load.sub (1); + adjust_load (-1); } void zmq::poll_t::set_pollin (handle_t handle_) @@ -124,11 +121,6 @@ void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_) timers.erase (it); } -int zmq::poll_t::get_load () -{ - return load.get (); -} - void zmq::poll_t::start () { worker.start (worker_routine, this); diff --git a/src/poll.hpp b/src/poll.hpp index 96d18ddd..e88c39df 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -34,7 +34,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "atomic_counter.hpp" +#include "poller_base.hpp" namespace zmq { @@ -42,7 +42,7 @@ namespace zmq // Implements socket polling mechanism using the POSIX.1-2001 // poll() system call. - class poll_t + class poll_t : public poller_base_t { public: @@ -60,7 +60,6 @@ namespace zmq void reset_pollout (handle_t handle_); void add_timer (int timeout_, struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_); - int get_load (); void start (); void stop (); @@ -98,10 +97,6 @@ namespace zmq // Handle of the physical thread doing the I/O work. thread_t worker; - // Load of the poller. Currently number of file descriptors - // registered with the poller. - atomic_counter_t load; - poll_t (const poll_t&); void operator = (const poll_t&); }; diff --git a/src/poller_base.cpp b/src/poller_base.cpp new file mode 100644 index 00000000..f1de7e9f --- /dev/null +++ b/src/poller_base.cpp @@ -0,0 +1,44 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "poller_base.hpp" +#include "err.hpp" + +zmq::poller_base_t::poller_base_t () +{ +} + +zmq::poller_base_t::~poller_base_t () +{ + // Make sure there are no fds registered on shutdown. + zmq_assert (get_load () == 0); +} + +int zmq::poller_base_t::get_load () +{ + return load.get (); +} + +void zmq::poller_base_t::adjust_load (int amount_) +{ + if (amount_ > 0) + load.add (amount_); + else if (amount_ < 0) + load.sub (-amount_); +} diff --git a/src/poller_base.hpp b/src/poller_base.hpp new file mode 100644 index 00000000..0b0d53d1 --- /dev/null +++ b/src/poller_base.hpp @@ -0,0 +1,56 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_POLLER_BASE_HPP_INCLUDED__ +#define __ZMQ_POLLER_BASE_HPP_INCLUDED__ + +#include "atomic_counter.hpp" + +namespace zmq +{ + + class poller_base_t + { + public: + + poller_base_t (); + ~poller_base_t (); + + // Returns load of the poller. Note that this function can be + // invoked from a different thread! + int get_load (); + + protected: + + // Called by individual poller implementations to manage the load. + void adjust_load (int amount_); + + private: + + // Load of the poller. Currently the number of file descriptors + // registered. + atomic_counter_t load; + + poller_base_t (const poller_base_t&); + void operator = (const poller_base_t&); + }; + +} + +#endif diff --git a/src/select.cpp b/src/select.cpp index f1692391..a04ee662 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -54,9 +54,6 @@ zmq::select_t::select_t () : zmq::select_t::~select_t () { worker.stop (); - - // Make sure there are no fds registered on shutdown. - zmq_assert (load.get () == 0); } zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) @@ -77,7 +74,7 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) maxfd = fd_; // Increase the load metric of the thread. - load.add (1); + adjust_load (1); return fd_; } @@ -113,7 +110,7 @@ void zmq::select_t::rm_fd (handle_t handle_) } // Decrease the load metric of the thread. - load.sub (1); + adjust_load (-1); } void zmq::select_t::set_pollin (handle_t handle_) @@ -148,11 +145,6 @@ void zmq::select_t::cancel_timer (i_poll_events *events_, int id_) timers.erase (it); } -int zmq::select_t::get_load () -{ - return load.get (); -} - void zmq::select_t::start () { worker.start (worker_routine, this); diff --git a/src/select.hpp b/src/select.hpp index ae19fe14..c6c7f518 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -36,7 +36,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "atomic_counter.hpp" +#include "poller_base.hpp" namespace zmq { @@ -44,7 +44,7 @@ namespace zmq // Implements socket polling mechanism using POSIX.1-2001 select() // function. - class select_t + class select_t : public poller_base_t { public: @@ -62,7 +62,6 @@ namespace zmq void reset_pollout (handle_t handle_); void add_timer (int timeout_, struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_); - int get_load (); void start (); void stop (); @@ -109,10 +108,6 @@ namespace zmq // Handle of the physical thread doing the I/O work. thread_t worker; - // Load of the poller. Currently number of file descriptors - // registered with the poller. - atomic_counter_t load; - select_t (const select_t&); void operator = (const select_t&); };