From 7cf4465168fde8cdb5eb7a4a3ac7bc1093a46d56 Mon Sep 17 00:00:00 2001 From: Dawid Drozd Date: Sat, 8 Sep 2018 19:31:05 +0200 Subject: [PATCH] Add AsyncEventBus Now it is possible to schedule events from different threads. --- CMakeLists.txt | 2 + lib/CMakeLists.txt | 15 +- lib/include/eventbus/AsyncEventBus.h | 202 ++++++++++++++++++ lib/include/eventbus/EventBus.h | 120 ++--------- lib/include/eventbus/EventCollector.h | 59 +---- lib/include/eventbus/TokenHolder.h | 141 ++++++++++++ .../eventbus/internal/AsyncCallbackVector.h | 41 ++++ .../eventbus/internal/CallbackVector.h | 16 ++ .../internal/TransactionCallbackVector.h | 88 ++++++++ lib/include/eventbus/internal/common.h | 28 +++ lib/src/EventCollector.cpp | 94 -------- lib/src/eventbus/AsyncEventBus.cpp | 44 ++++ performance/CMakeLists.txt | 4 +- test/CMakeLists.txt | 12 +- test/src/AsyncEventBusTest.cpp | 149 +++++++++++++ 15 files changed, 750 insertions(+), 265 deletions(-) create mode 100644 lib/include/eventbus/AsyncEventBus.h create mode 100644 lib/include/eventbus/TokenHolder.h create mode 100644 lib/include/eventbus/internal/AsyncCallbackVector.h create mode 100644 lib/include/eventbus/internal/CallbackVector.h create mode 100644 lib/include/eventbus/internal/TransactionCallbackVector.h create mode 100644 lib/include/eventbus/internal/common.h delete mode 100644 lib/src/EventCollector.cpp create mode 100644 lib/src/eventbus/AsyncEventBus.cpp create mode 100644 test/src/AsyncEventBusTest.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f93f0a2..146e13c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,8 @@ cmake_minimum_required(VERSION 3.8 FATAL_ERROR) project(EventBusDev) +set(CMAKE_CXX_STANDARD 14) + add_subdirectory(lib/) enable_testing() diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index f5707e0..92ec81a 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -2,10 +2,12 @@ cmake_minimum_required(VERSION 3.8 FATAL_ERROR) # BUILD_SHARED_LIBS can controll build type! project(EventBus - VERSION 2.3.0 + VERSION 2.4.0 LANGUAGES CXX ) +set(CMAKE_CXX_STANDARD 14) + # Dependencies # No dependencies for EventBus yay! @@ -23,9 +25,14 @@ set(config_install_dir "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}") # Library definition add_library(EventBus - src/EventCollector.cpp include/eventbus/EventCollector.h - include/eventbus/EventBus.h - ) + include/eventbus/EventBus.h + include/eventbus/EventCollector.h + include/eventbus/internal/AsyncCallbackVector.h + include/eventbus/internal/CallbackVector.h + include/eventbus/internal/TransactionCallbackVector.h + include/eventbus/TokenHolder.h + src/eventbus/AsyncEventBus.cpp include/eventbus/AsyncEventBus.h + ) add_library(Dexode::EventBus ALIAS EventBus) target_compile_features(EventBus PUBLIC cxx_std_14) diff --git a/lib/include/eventbus/AsyncEventBus.h b/lib/include/eventbus/AsyncEventBus.h new file mode 100644 index 0000000..2515f10 --- /dev/null +++ b/lib/include/eventbus/AsyncEventBus.h @@ -0,0 +1,202 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace Dexode +{ + +/** + * Async version of EventBus. Events are scheduled to queue and processed when called consume method. + * Methods like listen/unlisten also now will be processed when consume being called. + * + */ +class AsyncEventBus +{ +public: + AsyncEventBus() = default; + + ~AsyncEventBus() + { + std::lock_guard guard{_callbacksMutex}; + _callbacks.clear(); + } + + AsyncEventBus(const AsyncEventBus&) = delete; + AsyncEventBus(AsyncEventBus&&) = delete; + + AsyncEventBus& operator=(AsyncEventBus&&) = delete; + AsyncEventBus& operator=(const AsyncEventBus&) = delete; + + /** + * Register listener for event. Returns token used for unlisten. + * This request will be scheduled into queue. Need at least call 1 consume. + * It isn't ASAP event + * + * @tparam Event - type you want to listen for + * @param callback - your callback to handle event + * @return token used for unlisten + */ + template + int listen(std::function callback) + { + static_assert(Internal::validateEvent(), "Invalid event"); + + const int token = newToken(); + listen(token, std::move(callback)); + return token; + } + + /** + * Register listener for event. + * This request will be scheduled into queue. Need at least call 1 consume. + * It isn't ASAP event + * + * @tparam Event - type you want to listen for + * @param token - unique token for identification receiver. Simply pass token from @see EventBus::listen + * @param callback - your callback to handle event + */ + template + void listen(const int token, std::function callback) + { + static_assert(Internal::validateEvent(), "Invalid event"); + + std::lock_guard guard{_eventMutex}; + _commandsQueue.push_back([this, token, callback = std::move(callback)]() + { + std::lock_guard guard{_callbacksMutex}; + + using Vector = Internal::AsyncCallbackVector; + + assert(callback && "callback should be valid"); //Check for valid object + + std::unique_ptr& vector = + _callbacks[Internal::type_id]; + if(vector == nullptr) + { + vector.reset(new Vector{}); + } + assert(dynamic_cast(vector.get())); + Vector* callbacks = static_cast(vector.get()); + callbacks->add(token, callback); + }); + } + + /** + * This request will be scheduled into queue. Need at least call 1 consume. + * It isn't ASAP event + * + * @param token - token from EventBus::listen + */ + void unlistenAll(const int token) + { + std::lock_guard guard{_eventMutex}; + _commandsQueue.push_back([this, token]() + { + std::lock_guard guard{_callbacksMutex}; + for(auto& element : _callbacks) + { + element.second->remove(token); + } + }); + } + + /** + * This request will be scheduled into queue. Need at least call 1 consume. + * It isn't ASAP event + * + * @tparam Event - type you want to unlisten. @see Notiier::listen + * @param token - token from EventBus::listen + */ + template + void unlisten(const int token) + { + static_assert(Internal::validateEvent(), "Invalid event"); + + std::lock_guard guard{_eventMutex}; + _commandsQueue.push_back([this, token]() + { + std::lock_guard guard{_callbacksMutex}; + + auto found = _callbacks.find(Internal::type_id); + if(found != _callbacks.end()) + { + found->second->remove(token); + } + }); + } + + /** + * Schedule event to queue + * + * @param event your event struct + */ + template + void schedule(Event event) + { + static_assert(Internal::validateEvent(), "Invalid event"); + + std::lock_guard guard{_eventMutex}; + _eventQueue.push_back([this, event = std::move(event)]() + { + std::lock_guard guard{_callbacksMutex}; + + using Vector = Internal::AsyncCallbackVector; + auto found = _callbacks.find(Internal::type_id); + if(found == _callbacks.end()) + { + return; // no such notifications + } + + std::unique_ptr& vector = found->second; + assert(dynamic_cast(vector.get())); + Vector* callbacks = static_cast(vector.get()); + + for(const auto& element : callbacks->container) + { + element.second(event); + } + }); + } + + /** + * Process queued events. This should be called always on same thread. + * @param max maximum count of events to consume, if 0 then all available events will be consumed. + * If max is higher than available events then only available events will be consumed. + * @return number of consumed events + */ + int consume(int max = 0); + + std::size_t getQueueEventCount() const + { + std::lock_guard guard{_eventMutex}; + return _eventQueue.size(); + } + +private: + int newToken() + { + std::lock_guard guard{_eventMutex}; + int token = ++_tokener; + return token; + } + + std::size_t processCommandsAndGetQueuedEventsCount(); + + int _tokener = 0; + std::map> _callbacks; + mutable std::mutex _callbacksMutex; + mutable std::mutex _eventMutex; + std::deque> _eventQueue; + std::deque> _commandsQueue; +}; + +} /* namespace Dexode */ diff --git a/lib/include/eventbus/EventBus.h b/lib/include/eventbus/EventBus.h index d4ffc7e..a738c9b 100644 --- a/lib/include/eventbus/EventBus.h +++ b/lib/include/eventbus/EventBus.h @@ -5,28 +5,15 @@ #include #include #include -#include + +#include +#include namespace Dexode { -template -void type_id() // Helper for getting "type id" -{} - -using type_id_t = void (*)(); // Function pointer - class EventBus { - template - constexpr void validateEvent() - { - static_assert(std::is_const::value == false, "Struct must be without const"); - static_assert(std::is_volatile::value == false, "Struct must be without volatile"); - static_assert(std::is_reference::value == false, "Struct must be without reference"); - static_assert(std::is_pointer::value == false, "Struct must be without pointer"); - } - public: EventBus() = default; @@ -51,7 +38,7 @@ public: template int listen(const std::function& callback) { - validateEvent(); + static_assert(Internal::validateEvent(), "Invalid event"); const int token = ++_tokener; listen(token, callback); @@ -66,13 +53,13 @@ public: template void listen(const int token, const std::function& callback) { - validateEvent(); + static_assert(Internal::validateEvent(), "Invalid event"); - using Vector = VectorImpl; + using Vector = Internal::TransactionCallbackVector; assert(callback && "callback should be valid"); //Check for valid object - std::unique_ptr& vector = _callbacks[type_id]; + std::unique_ptr& vector = _callbacks[Internal::type_id]; if(vector == nullptr) { vector.reset(new Vector{}); @@ -100,9 +87,9 @@ public: template void unlisten(const int token) { - validateEvent(); + static_assert(Internal::validateEvent(), "Invalid event"); - auto found = _callbacks.find(type_id); + auto found = _callbacks.find(Internal::type_id); if(found != _callbacks.end()) { found->second->remove(token); @@ -118,16 +105,16 @@ public: void notify(const Event& event) { using CleanEventType = typename std::remove_const::type; - validateEvent(); + static_assert(Internal::validateEvent(), "Invalid event"); - using Vector = VectorImpl; - auto found = _callbacks.find(type_id); + using Vector = Internal::TransactionCallbackVector; + auto found = _callbacks.find(Internal::type_id); if(found == _callbacks.end()) { return; // no such notifications } - std::unique_ptr& vector = found->second; + std::unique_ptr& vector = found->second; assert(dynamic_cast(vector.get())); Vector* vectorImpl = static_cast(vector.get()); @@ -140,87 +127,8 @@ public: } private: - struct VectorInterface - { - virtual ~VectorInterface() = default; - - virtual void remove(const int token) = 0; - }; - - template - struct VectorImpl : public VectorInterface - { - using CallbackType = std::function; - using ContainerElement = std::pair; - using ContainerType = std::vector; - ContainerType container; - ContainerType toAdd; - std::vector toRemove; - int inTransaction = 0; - - virtual void remove(const int token) override - { - if(inTransaction > 0) - { - toRemove.push_back(token); - return; - } - - //Invalidation rules: https://stackoverflow.com/questions/6438086/iterator-invalidation-rules - auto removeFrom = std::remove_if( - container.begin(), container.end(), [token](const ContainerElement& element) { - return element.first == token; - }); - if(removeFrom != container.end()) - { - container.erase(removeFrom, container.end()); - } - } - - void add(const int token, const CallbackType& callback) - { - if(inTransaction > 0) - { - toAdd.emplace_back(token, callback); - } - else - { - container.emplace_back(token, callback); - } - } - - void beginTransaction() - { - ++inTransaction; - } - - void commitTransaction() - { - --inTransaction; - if(inTransaction > 0) - { - return; - } - inTransaction = 0; - - if(toAdd.empty() == false) - { - container.insert(container.end(), toAdd.begin(), toAdd.end()); - toAdd.clear(); - } - if(toRemove.empty() == false) - { - for(auto token : toRemove) - { - remove(token); - } - toRemove.clear(); - } - } - }; - int _tokener = 0; - std::map> _callbacks; + std::map> _callbacks; }; } /* namespace Dexode */ diff --git a/lib/include/eventbus/EventCollector.h b/lib/include/eventbus/EventCollector.h index 333099e..e9442c6 100644 --- a/lib/include/eventbus/EventCollector.h +++ b/lib/include/eventbus/EventCollector.h @@ -7,65 +7,12 @@ #include #include "EventBus.h" +#include "TokenHolder.h" namespace Dexode { -class EventCollector -{ -public: - EventCollector(const std::shared_ptr& bus); - EventCollector(EventBus* bus); - EventCollector(EventCollector const& other); - EventCollector(EventCollector&& other); - - ~EventCollector(); - - EventCollector& operator=(EventCollector const& other); - EventCollector& operator=(EventCollector&& other); - - /** - * Register listener for event. - * - * @tparam Event - type you want to listen for - * @param callback - your callback to handle event - */ - template - void listen(const std::function& callback) - { - if (!callback || !_bus) - { - return;//Skip such things - } - if (_token == 0) - { - _token = _bus->listen(callback); - } - else - { - _bus->listen(_token, callback); - } - } - - void unlistenAll(); - - /** - * @tparam Event - type you want to unlisten. @see Notiier::listen - */ - template - void unlisten() - { - if (_bus) - { - _bus->unlisten(_token); - } - } - - bool isUsing(const std::shared_ptr& bus) const; - -private: - int _token = 0; - std::shared_ptr _bus; -}; +// [[deprecated("Deprecating EventCollector. Try move to: TokenHolder<>")]] +using EventCollector = TokenHolder; } diff --git a/lib/include/eventbus/TokenHolder.h b/lib/include/eventbus/TokenHolder.h new file mode 100644 index 0000000..27be66b --- /dev/null +++ b/lib/include/eventbus/TokenHolder.h @@ -0,0 +1,141 @@ +#pragma once + +#include +#include +#include + +namespace +{ + +template +void null_deleter(Bus*) +{ +} + +} // namespace + +namespace Dexode +{ +// +//template +//class EventBusWrapper : Bus + +template +class TokenHolder +{ +public: + TokenHolder(const std::shared_ptr& bus) + : _bus{bus} + { + assert(_bus); + } + + TokenHolder(Bus* bus) + : _bus(bus, &null_deleter) + { + } + + TokenHolder(const TokenHolder& other) + : _bus(other._bus) + { + } + + TokenHolder(TokenHolder&& other) + : _token(other._token) + , _bus(std::move(other._bus)) + { + other._token = 0; + } + + ~TokenHolder() + { + unlistenAll(); + } + + TokenHolder& operator=(const TokenHolder& other) + { + if(this == &other) + { + return *this; + } + if(other._bus.get() != _bus.get()) + { + unlistenAll(); + _bus = other._bus; + } + + return *this; + } + + TokenHolder& operator=(TokenHolder&& other) + { + if(this == &other) + { + return *this; + } + + unlistenAll(); + + _token = other._token; + other._token = 0; + _bus = std::move(other._bus); + + return *this; + } + + /** + * Register listener for event. + * + * @tparam Event - type you want to listen for + * @param callback - your callback to handle event + */ + template + void listen(std::function callback) + { + if(!callback || !_bus) + { + assert(callback); + assert(_bus); + return; //Skip such things + } + if(_token == 0) + { + _token = _bus->template listen(std::move(callback)); + } + else + { + _bus->template listen(_token, std::move(callback)); + } + } + + void unlistenAll() + { + if(_token != 0 && _bus) + { + _bus->unlistenAll(_token); + } + } + + /** + * @tparam Event - type you want to unlisten. @see Notiier::listen + */ + template + void unlisten() + { + if(_bus) + { + _bus->template unlisten(_token); + } + } + + bool isUsing(const std::shared_ptr& bus) const + { + return _bus == bus; + } + +private: + int _token = 0; + std::shared_ptr _bus; +}; + +} // namespace Dexode diff --git a/lib/include/eventbus/internal/AsyncCallbackVector.h b/lib/include/eventbus/internal/AsyncCallbackVector.h new file mode 100644 index 0000000..edebf65 --- /dev/null +++ b/lib/include/eventbus/internal/AsyncCallbackVector.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include +#include + +#include "CallbackVector.h" + +namespace Dexode +{ +namespace Internal +{ + +template +struct AsyncCallbackVector : public CallbackVector +{ + using CallbackType = std::function; + using ContainerElement = std::pair; + std::vector container; + + virtual void remove(const int token) override + { + auto removeFrom = std::remove_if( + container.begin(), container.end(), [token](const ContainerElement& element) + { + return element.first == token; + }); + if(removeFrom != container.end()) + { + container.erase(removeFrom, container.end()); + } + } + + void add(const int token, CallbackType callback) + { + container.emplace_back(token, std::move(callback)); + } +}; + +} // namespace Internal +} // namespace Dexode diff --git a/lib/include/eventbus/internal/CallbackVector.h b/lib/include/eventbus/internal/CallbackVector.h new file mode 100644 index 0000000..37500b9 --- /dev/null +++ b/lib/include/eventbus/internal/CallbackVector.h @@ -0,0 +1,16 @@ +#pragma once + +namespace Dexode +{ +namespace Internal +{ + +struct CallbackVector +{ + virtual ~CallbackVector() = default; + + virtual void remove(const int token) = 0; +}; + +} // namespace Internal +} // namespace Dexode diff --git a/lib/include/eventbus/internal/TransactionCallbackVector.h b/lib/include/eventbus/internal/TransactionCallbackVector.h new file mode 100644 index 0000000..f82ab0c --- /dev/null +++ b/lib/include/eventbus/internal/TransactionCallbackVector.h @@ -0,0 +1,88 @@ +#pragma once + +#include +#include +#include + +#include "CallbackVector.h" + +namespace Dexode +{ +namespace Internal +{ + +template +struct TransactionCallbackVector : public CallbackVector +{ + using CallbackType = std::function; + using ContainerElement = std::pair; + using ContainerType = std::vector; + ContainerType container; + ContainerType toAdd; + std::vector toRemove; + int inTransaction = 0; + + virtual void remove(const int token) override + { + if(inTransaction > 0) + { + toRemove.push_back(token); + return; + } + + //Invalidation rules: https://stackoverflow.com/questions/6438086/iterator-invalidation-rules + auto removeFrom = std::remove_if( + container.begin(), container.end(), [token](const ContainerElement& element) + { + return element.first == token; + }); + if(removeFrom != container.end()) + { + container.erase(removeFrom, container.end()); + } + } + + void add(const int token, const CallbackType& callback) + { + if(inTransaction > 0) + { + toAdd.emplace_back(token, callback); + } + else + { + container.emplace_back(token, callback); + } + } + + void beginTransaction() + { + ++inTransaction; + } + + void commitTransaction() + { + --inTransaction; + if(inTransaction > 0) + { + return; + } + inTransaction = 0; + + if(toAdd.empty() == false) + { + container.insert(container.end(), toAdd.begin(), toAdd.end()); + toAdd.clear(); + } + if(toRemove.empty() == false) + { + for(auto token : toRemove) + { + remove(token); + } + toRemove.clear(); + } + } +}; + +} // namespace Internal +} // namespace Dexode diff --git a/lib/include/eventbus/internal/common.h b/lib/include/eventbus/internal/common.h new file mode 100644 index 0000000..1a37916 --- /dev/null +++ b/lib/include/eventbus/internal/common.h @@ -0,0 +1,28 @@ +#pragma once + +#include + +namespace Dexode +{ +namespace Internal +{ + +template +void type_id() // Helper for getting "type id" +{ +} + +using type_id_t = void (*)(); // Function pointer + +template +constexpr bool validateEvent() +{ + static_assert(std::is_const::value == false, "Struct must be without const"); + static_assert(std::is_volatile::value == false, "Struct must be without volatile"); + static_assert(std::is_reference::value == false, "Struct must be without reference"); + static_assert(std::is_pointer::value == false, "Struct must be without pointer"); + return true; +} + +} // namespace Internal +} // namespace Dexode diff --git a/lib/src/EventCollector.cpp b/lib/src/EventCollector.cpp deleted file mode 100644 index 93de5e7..0000000 --- a/lib/src/EventCollector.cpp +++ /dev/null @@ -1,94 +0,0 @@ -// -// Created by Dawid Drozd aka Gelldur on 18/10/16. -// - -#include - -#include - -namespace -{ - -void null_deleter(Dexode::EventBus*) -{ -} - -} - -namespace Dexode -{ - -EventCollector::EventCollector(const std::shared_ptr& bus) - : _bus(bus) -{ - assert(_bus); -} - -//Maybe ugly but hey ;) Less code and simply i can :D -EventCollector::EventCollector(EventBus* bus) - : _bus(bus, &null_deleter) -{ -} - -EventCollector::EventCollector(const EventCollector& other) - : _bus(other._bus) -{ -} - -EventCollector::EventCollector(EventCollector&& other) - : _token(other._token) - , _bus(std::move(other._bus)) -{ - other._token = 0; -} - -EventCollector::~EventCollector() -{ - unlistenAll(); -} - -EventCollector& EventCollector::operator=(const EventCollector& other) -{ - if (this == &other) - { - return *this; - } - if (other._bus.get() != _bus.get()) - { - unlistenAll(); - _bus = other._bus; - } - - return *this; -} - -EventCollector& EventCollector::operator=(EventCollector&& other) -{ - if (this == &other) - { - return *this; - } - - unlistenAll(); - - _token = other._token; - other._token = 0; - _bus = std::move(other._bus); - - return *this; -} - -void EventCollector::unlistenAll() -{ - if (_token != 0 && _bus) - { - _bus->unlistenAll(_token); - } -} - -bool EventCollector::isUsing(const std::shared_ptr& bus) const -{ - return _bus == bus; -} - -} diff --git a/lib/src/eventbus/AsyncEventBus.cpp b/lib/src/eventbus/AsyncEventBus.cpp new file mode 100644 index 0000000..c07d30b --- /dev/null +++ b/lib/src/eventbus/AsyncEventBus.cpp @@ -0,0 +1,44 @@ +#include + +#include + +namespace Dexode +{ + +std::size_t AsyncEventBus::processCommandsAndGetQueuedEventsCount() +{ + std::lock_guard guard{_eventMutex}; + while(_commandsQueue.empty() == false) + { + _commandsQueue.front()();//This can't add any extra commands, because in this queue we story only listen/unlisten stuff + _commandsQueue.pop_front(); + } + return _eventQueue.size(); //Yeah we want to return events count. So don't have to call getQueueEventCount +} + +int AsyncEventBus::consume(int max) +{ + if(max == 0) + { + max = std::numeric_limits::max(); + } + + int consumed = 0; + + std::function eventCommand; + while(processCommandsAndGetQueuedEventsCount() > 0 && consumed < max) //order is important + { + { + std::lock_guard guard{_eventMutex}; + eventCommand = std::move(_eventQueue.front()); + _eventQueue.pop_front(); + } + + eventCommand(); + ++consumed; + } + + return consumed; +} + +} // namespace Dexode diff --git a/performance/CMakeLists.txt b/performance/CMakeLists.txt index be1d169..8f0c995 100644 --- a/performance/CMakeLists.txt +++ b/performance/CMakeLists.txt @@ -1,5 +1,7 @@ cmake_minimum_required(VERSION 3.8 FATAL_ERROR) +set(CMAKE_CXX_STANDARD 14) + # http://www.levelofindirection.com/journal/2010/12/28/unit-testing-in-c-and-objective-c-just-got-easier.html # Thanks for CATCH! @@ -46,4 +48,4 @@ target_link_libraries(EventBusPerformance PUBLIC benchmark $<$:Poco::Foundation> $<$:Poco::Util> - ) \ No newline at end of file + ) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 2811258..bf21ce9 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -5,6 +5,8 @@ cmake_minimum_required(VERSION 3.8 FATAL_ERROR) project(EventBusTest) +set(CMAKE_CXX_STANDARD 14) + # Dependencies enable_testing() if (NOT TARGET Dexode::EventBus) @@ -13,12 +15,14 @@ endif () # From 2.3.X they broke back compatibility find_package(Catch2 2.3 REQUIRED) +find_package(Threads REQUIRED) # Target definition add_executable(EventBusTest - src/EventCollectorTest.cpp - src/NotifierTest.cpp - ) + src/AsyncEventBusTest.cpp + src/EventCollectorTest.cpp + src/NotifierTest.cpp + ) target_compile_options(EventBusTest PUBLIC -Wall -pedantic @@ -47,7 +51,7 @@ set(EVENTBUS_DEBUG_FLAGS target_compile_options(EventBusTest PUBLIC "$<$:${EVENTBUS_DEBUG_FLAGS}>") -target_link_libraries(EventBusTest PUBLIC Dexode::EventBus Catch2::Catch2) +target_link_libraries(EventBusTest PUBLIC Catch2::Catch2 Dexode::EventBus Threads::Threads) add_test(NAME EventBus.UnitTests COMMAND EventBusTest) diff --git a/test/src/AsyncEventBusTest.cpp b/test/src/AsyncEventBusTest.cpp new file mode 100644 index 0000000..e032f1a --- /dev/null +++ b/test/src/AsyncEventBusTest.cpp @@ -0,0 +1,149 @@ +#include +#include +#include +#include + +#include + +#include +#include + +using namespace std::chrono; + +const auto ns2 = std::chrono::nanoseconds{2};// GCC 7 has some issues with 2ms :/ +const auto ns3 = std::chrono::nanoseconds{3}; + +TEST_CASE("Should consume events in synchronous way When using AsyncEventBus", "[AsyncEventBus]") +{ + struct SimpleEvent + { + std::thread::id id; + }; + + Dexode::AsyncEventBus bus; + + int counter = 0; + + bus.listen([&counter](const SimpleEvent& event) + { + std::cout << "Event from: " << event.id << std::endl; + ++counter; + }); + + std::thread worker1{[&bus]() + { + for(int i = 0; i < 10; ++i) + { + bus.schedule(SimpleEvent{std::this_thread::get_id()}); + std::this_thread::sleep_for(ns3); + } + }}; + std::thread worker2{[&bus]() + { + for(int i = 0; i < 10; ++i) + { + bus.schedule(SimpleEvent{std::this_thread::get_id()}); + std::this_thread::sleep_for(ns2); + } + }}; + + REQUIRE(counter == 0); + while(counter < 20) + { + static int sumOfConsumed = 0; + REQUIRE(counter == sumOfConsumed); + sumOfConsumed += bus.consume(); + REQUIRE(counter == sumOfConsumed); + } + + worker1.join(); + worker2.join(); +} + +TEST_CASE("Should unlisten for event When call unlisten inside Listener", "[AsyncEventBus]") +{ + struct SimpleEvent + { + std::thread::id id; + }; + + Dexode::AsyncEventBus bus; + + int counter = 0; + + const int myToken = 0x23167; + + bus.listen(myToken, [&counter, &bus](const SimpleEvent& event) + { + std::cout << "Event from: " << event.id << std::endl; + ++counter; + bus.unlistenAll(myToken);//This doesn't mean that unlisten will be ASAP! + }); + + REQUIRE(counter == 0); + bus.schedule(SimpleEvent{std::this_thread::get_id()}); + //This should consume (listen request), SimpleEvent, ()unlisten request + REQUIRE(bus.consume(1) == 1); + + for(int i = 0; i < 10; ++i) + { + bus.consume(); + bus.schedule(SimpleEvent{std::this_thread::get_id()}); + bus.consume(); + } + + REQUIRE(counter == 1);//Should be called only once +} + +TEST_CASE("Should listen for only 1 event When call unlisten inside Listener", "[AsyncEventBus]") +{ + struct SimpleEvent + { + std::thread::id id; + }; + + Dexode::AsyncEventBus bus; + + int counter = 0; + + const int myToken = 0x23167; + + bus.listen(myToken, [&counter, &bus](const SimpleEvent& event) + { + std::cout << "Event from: " << event.id << std::endl; + ++counter; + bus.unlistenAll(myToken);//This doesn't mean that unlisten will be ASAP! + }); + + //Workers in this test are only extra stuff + std::thread worker1{[&bus]() + { + for(int i = 0; i < 10; ++i) + { + bus.schedule(SimpleEvent{std::this_thread::get_id()}); + std::this_thread::sleep_for(ns3); + } + }}; + //Workers in this test are only extra stuff + std::thread worker2{[&bus]() + { + for(int i = 0; i < 10; ++i) + { + bus.schedule(SimpleEvent{std::this_thread::get_id()}); + std::this_thread::sleep_for(ns2); + } + }}; + + REQUIRE(counter == 0); + + for(int i = 0; i < 10; ++i) + { + bus.schedule(SimpleEvent{std::this_thread::get_id()}); + bus.schedule(SimpleEvent{std::this_thread::get_id()}); + bus.consume(); + } + + REQUIRE(counter == 1);//Should be called only once + worker1.join(); + worker2.join(); +}