From 2bc2858a8a72e9928d5a61d84211f37520cb8c21 Mon Sep 17 00:00:00 2001 From: Dawid Drozd Date: Fri, 27 Dec 2019 11:21:43 +0100 Subject: [PATCH] New EventBus 3.0 --- .clang-format | 12 +- CMakeLists.txt | 3 +- lib/CMakeLists.txt | 44 +- lib/src/dexode/EventBus.cpp | 130 ++++++ lib/src/dexode/EventBus.hpp | 104 ++--- lib/src/dexode/eventbus/Bus.hpp | 126 ++++++ lib/src/dexode/eventbus/Listener.hpp | 46 +- lib/src/dexode/eventbus/TagEventBus.hpp | 121 ----- .../eventbus/internal/AsyncCallbackVector.h | 37 -- .../dexode/eventbus/internal/CallbackVector.h | 13 - .../eventbus/internal/ListenerAttorney.hpp | 16 +- .../internal/TransactionCallbackVector.h | 85 ---- .../internal/{common.h => event_id.hpp} | 13 +- .../eventbus/internal/listener_traits.hpp | 27 ++ lib/src/dexode/eventbus/perk/PassPerk.cpp | 17 + lib/src/dexode/eventbus/perk/PassPerk.hpp | 32 ++ lib/src/dexode/eventbus/perk/Perk.cpp | 9 + lib/src/dexode/eventbus/perk/Perk.hpp | 22 + lib/src/dexode/eventbus/perk/PerkEventBus.cpp | 39 ++ lib/src/dexode/eventbus/perk/PerkEventBus.hpp | 74 +++ lib/src/dexode/eventbus/perk/TagPerk.cpp | 20 + lib/src/dexode/eventbus/perk/TagPerk.hpp | 48 ++ lib/src/dexode/eventbus/perk/WaitPerk.cpp | 35 ++ lib/src/dexode/eventbus/perk/WaitPerk.hpp | 33 ++ .../eventbus/permission/PostponeBus.hpp | 35 ++ .../dexode/eventbus/strategy/Protected.cpp | 71 --- .../dexode/eventbus/strategy/Protected.hpp | 115 ----- .../dexode/eventbus/strategy/Transaction.hpp | 115 ----- .../dexode/eventbus/stream/EventStream.hpp | 42 ++ .../eventbus/stream/ProtectedEventStream.hpp | 163 +++++++ lib/src/eventbus/AsyncEventBus.cpp | 55 --- performance/src/AsyncEventBusPerformance.cpp | 54 +++ sample/CMakeLists.txt | 13 - test/CMakeLists.txt | 12 +- test/src/AsyncEventBusTest.cpp | 146 ------ test/src/EventCollectorTest.cpp | 77 ---- test/src/EventIdTest.cpp | 52 --- test/src/NotifierTest.cpp | 393 ---------------- test/src/TestTagEventBus.cpp | 67 --- .../eventbus/test/SuiteConcurrentEventBus.cpp | 180 ++++++++ .../dexode/eventbus/test/SuiteEventBus.cpp | 424 ++++++++++++++++++ .../src/dexode/eventbus/test/SuiteEventID.cpp | 57 +++ .../dexode/eventbus/test/SuiteListener.cpp | 234 ++++++++++ test/src/dexode/eventbus/test/event.hpp | 32 ++ test/src/main.cpp | 2 + use_case/CMakeLists.txt | 3 + use_case/README.md | 5 + use_case/basic/CMakeLists.txt | 6 + use_case/basic/README.md | 3 + {sample => use_case/basic}/src/main.cpp | 60 +-- use_case/tagged_events/CMakeLists.txt | 11 + use_case/tagged_events/README.md | 14 + use_case/tagged_events/src/Character.cpp | 25 ++ use_case/tagged_events/src/Character.hpp | 23 + use_case/tagged_events/src/EventBus.hpp | 9 + use_case/tagged_events/src/Gui.cpp | 33 ++ use_case/tagged_events/src/Gui.hpp | 21 + use_case/tagged_events/src/Team.cpp | 43 ++ use_case/tagged_events/src/Team.hpp | 24 + use_case/tagged_events/src/event.hpp | 27 ++ use_case/tagged_events/src/main.cpp | 42 ++ 61 files changed, 2285 insertions(+), 1509 deletions(-) create mode 100644 lib/src/dexode/EventBus.cpp create mode 100644 lib/src/dexode/eventbus/Bus.hpp delete mode 100644 lib/src/dexode/eventbus/TagEventBus.hpp delete mode 100644 lib/src/dexode/eventbus/internal/AsyncCallbackVector.h delete mode 100644 lib/src/dexode/eventbus/internal/CallbackVector.h delete mode 100644 lib/src/dexode/eventbus/internal/TransactionCallbackVector.h rename lib/src/dexode/eventbus/internal/{common.h => event_id.hpp} (76%) create mode 100644 lib/src/dexode/eventbus/internal/listener_traits.hpp create mode 100644 lib/src/dexode/eventbus/perk/PassPerk.cpp create mode 100644 lib/src/dexode/eventbus/perk/PassPerk.hpp create mode 100644 lib/src/dexode/eventbus/perk/Perk.cpp create mode 100644 lib/src/dexode/eventbus/perk/Perk.hpp create mode 100644 lib/src/dexode/eventbus/perk/PerkEventBus.cpp create mode 100644 lib/src/dexode/eventbus/perk/PerkEventBus.hpp create mode 100644 lib/src/dexode/eventbus/perk/TagPerk.cpp create mode 100644 lib/src/dexode/eventbus/perk/TagPerk.hpp create mode 100644 lib/src/dexode/eventbus/perk/WaitPerk.cpp create mode 100644 lib/src/dexode/eventbus/perk/WaitPerk.hpp create mode 100644 lib/src/dexode/eventbus/permission/PostponeBus.hpp delete mode 100644 lib/src/dexode/eventbus/strategy/Protected.cpp delete mode 100644 lib/src/dexode/eventbus/strategy/Protected.hpp delete mode 100644 lib/src/dexode/eventbus/strategy/Transaction.hpp create mode 100644 lib/src/dexode/eventbus/stream/EventStream.hpp create mode 100644 lib/src/dexode/eventbus/stream/ProtectedEventStream.hpp delete mode 100644 lib/src/eventbus/AsyncEventBus.cpp create mode 100644 performance/src/AsyncEventBusPerformance.cpp delete mode 100644 sample/CMakeLists.txt delete mode 100644 test/src/AsyncEventBusTest.cpp delete mode 100644 test/src/EventCollectorTest.cpp delete mode 100644 test/src/EventIdTest.cpp delete mode 100644 test/src/NotifierTest.cpp delete mode 100644 test/src/TestTagEventBus.cpp create mode 100644 test/src/dexode/eventbus/test/SuiteConcurrentEventBus.cpp create mode 100644 test/src/dexode/eventbus/test/SuiteEventBus.cpp create mode 100644 test/src/dexode/eventbus/test/SuiteEventID.cpp create mode 100644 test/src/dexode/eventbus/test/SuiteListener.cpp create mode 100644 test/src/dexode/eventbus/test/event.hpp create mode 100644 test/src/main.cpp create mode 100644 use_case/CMakeLists.txt create mode 100644 use_case/README.md create mode 100644 use_case/basic/CMakeLists.txt create mode 100644 use_case/basic/README.md rename {sample => use_case/basic}/src/main.cpp (57%) create mode 100644 use_case/tagged_events/CMakeLists.txt create mode 100644 use_case/tagged_events/README.md create mode 100644 use_case/tagged_events/src/Character.cpp create mode 100644 use_case/tagged_events/src/Character.hpp create mode 100644 use_case/tagged_events/src/EventBus.hpp create mode 100644 use_case/tagged_events/src/Gui.cpp create mode 100644 use_case/tagged_events/src/Gui.hpp create mode 100644 use_case/tagged_events/src/Team.cpp create mode 100644 use_case/tagged_events/src/Team.hpp create mode 100644 use_case/tagged_events/src/event.hpp create mode 100644 use_case/tagged_events/src/main.cpp diff --git a/.clang-format b/.clang-format index d5e0dc0..8961363 100644 --- a/.clang-format +++ b/.clang-format @@ -4,7 +4,7 @@ # https://github.com/01org/parameter-framework/blob/master/.clang-format # Tested on: clang-format version 8.0.0 -# Version 1.1 +# Version 1.2 # Common settings @@ -27,13 +27,14 @@ Language: Cpp # void formatted_code_again; DisableFormat: false -Standard: Cpp11 +Standard: Auto AccessModifierOffset: -4 AlignAfterOpenBracket: true AlignConsecutiveAssignments: false AlignConsecutiveDeclarations: false AlignEscapedNewlinesLeft: false +AlignEscapedNewlines: Right AlignOperands: true AlignTrailingComments: false AllowAllParametersOfDeclarationOnNextLine: true @@ -105,13 +106,16 @@ NamespaceIndentation: None ObjCBlockIndentWidth: 4 ObjCSpaceAfterProperty: true ObjCSpaceBeforeProtocolList: true + +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 1000000 +PenaltyBreakAssignment: 2 PenaltyBreakBeforeFirstCallParameter: 19 PenaltyBreakComment: 300 PenaltyBreakFirstLessLess: 120 PenaltyBreakString: 1000 +PenaltyBreakTemplateDeclaration: 10 -PenaltyExcessCharacter: 1000000 -PenaltyReturnTypeOnItsOwnLine: 1000000 PointerAlignment: Left SpaceAfterCStyleCast: false SpaceBeforeAssignmentOperators: true diff --git a/CMakeLists.txt b/CMakeLists.txt index edb5546..5d86095 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,14 +11,13 @@ option(ENABLE_PERFORMANCE "Enable performance subproject" OFF) set(CMAKE_CXX_STANDARD 17) add_subdirectory(lib/) +add_subdirectory(use_case/) if(ENABLE_TEST) enable_testing() add_subdirectory(test/) endif() -add_subdirectory(sample/) - if(ENABLE_PERFORMANCE) add_subdirectory(performance/) endif() diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 36048bb..df048ef 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -33,18 +33,32 @@ include(cmake/InstallHelp.cmake) # * /include/ set(config_install_dir "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}") +set(EventBus_PUBLIC_HEADERS + src/dexode/EventBus.hpp + src/dexode/eventbus/Bus.hpp + src/dexode/eventbus/internal/event_id.hpp + src/dexode/eventbus/internal/listener_traits.hpp + src/dexode/eventbus/internal/ListenerAttorney.hpp + src/dexode/eventbus/Listener.hpp + src/dexode/eventbus/perk/PassPerk.hpp + src/dexode/eventbus/perk/Perk.hpp + src/dexode/eventbus/perk/PerkEventBus.hpp + src/dexode/eventbus/perk/TagPerk.hpp + src/dexode/eventbus/perk/WaitPerk.hpp + src/dexode/eventbus/permission/PostponeBus.hpp + src/dexode/eventbus/stream/EventStream.hpp + src/dexode/eventbus/stream/ProtectedEventStream.hpp + ) + # Library definition add_library(EventBus - src/dexode/EventBus.hpp - src/dexode/eventbus/internal/AsyncCallbackVector.h - src/dexode/eventbus/internal/CallbackVector.h - src/dexode/eventbus/internal/common.h - src/dexode/eventbus/internal/ListenerAttorney.hpp - src/dexode/eventbus/internal/TransactionCallbackVector.h - src/dexode/eventbus/Listener.hpp - src/dexode/eventbus/strategy/Protected.cpp src/dexode/eventbus/strategy/Protected.hpp - src/dexode/eventbus/strategy/Transaction.hpp - src/dexode/eventbus/TagEventBus.hpp + ${EventBus_PUBLIC_HEADERS} + src/dexode/EventBus.cpp + src/dexode/eventbus/perk/PassPerk.cpp + src/dexode/eventbus/perk/Perk.cpp + src/dexode/eventbus/perk/PerkEventBus.cpp + src/dexode/eventbus/perk/TagPerk.cpp + src/dexode/eventbus/perk/WaitPerk.cpp ) add_library(Dexode::EventBus ALIAS EventBus) @@ -55,16 +69,6 @@ target_include_directories(EventBus PUBLIC $ ) -set(EventBus_PUBLIC_HEADERS - src/dexode/EventBus.hpp - src/dexode/eventbus/internal/AsyncCallbackVector.h - src/dexode/eventbus/internal/CallbackVector.h - src/dexode/eventbus/internal/common.h - src/dexode/eventbus/internal/TransactionCallbackVector.h - src/dexode/eventbus/Listener.hpp - src/dexode/eventbus/strategy/Protected.hpp - src/dexode/eventbus/strategy/Transaction.hpp - ) # Add definitions for targets # Values: # * Debug: -DEVENTBUS_DEBUG=1 diff --git a/lib/src/dexode/EventBus.cpp b/lib/src/dexode/EventBus.cpp new file mode 100644 index 0000000..87270a9 --- /dev/null +++ b/lib/src/dexode/EventBus.cpp @@ -0,0 +1,130 @@ +// +// Created by gelldur on 26.11.2019. +// +#include "EventBus.hpp" + +namespace dexode +{ + +std::size_t EventBus::processLimit(const std::size_t limit) +{ + std::size_t processCount{0}; + std::lock_guard writeGuardProcess{_mutexProcess}; // Only one process at the time + + std::vector> eventStreams; + { + std::lock_guard writeGuard{_mutexStreams}; + std::swap(eventStreams, _eventStreams); // move data FROM member + } + + // Now if any setStream would be called it doesn't conflict without our process call + for(auto& eventStream : eventStreams) + { + const auto runProcessCount = eventStream->process(limit); + processCount += runProcessCount; + if(processCount >= limit) + { + break; + } + } + + { + std::lock_guard writeGuard{_mutexStreams}; + if(not _eventStreams.empty()) + { + // If anything was added then we need to add those elements + std::move(_eventStreams.begin(), _eventStreams.end(), std::back_inserter(eventStreams)); + } + std::swap(eventStreams, _eventStreams); // move data TO member + + // Check do we need remove something + if(_eventStreams.size() != _eventToStream.size()) + { + auto removeFrom = std::remove_if( + _eventStreams.begin(), _eventStreams.end(), [this](const auto& eventStream) { + for(const auto& element : _eventToStream) + { + // Don't remove if we point to the same place (is it UB ?) + if(element.second == eventStream.get()) + { + return false; + } + } + return true; + }); + assert(removeFrom != _eventStreams.end()); + _eventStreams.erase(removeFrom, _eventStreams.end()); + } + } + + return processCount; +} + +eventbus::stream::EventStream* EventBus::findStream( + const eventbus::internal::event_id_t eventID) const +{ + std::shared_lock readGuard{_mutexStreams}; + return findStreamUnsafe(eventID); +} + +void EventBus::unlistenAll(const std::uint32_t listenerID) +{ + std::shared_lock readGuard{_mutexStreams}; + for(auto& eventStream : _eventToStream) + { + eventStream.second->removeListener(listenerID); + } +} + +eventbus::stream::EventStream* EventBus::findStreamUnsafe( + const eventbus::internal::event_id_t eventID) const +{ + auto lookup = _eventToStream.find(eventID); + return lookup != _eventToStream.end() ? lookup->second : nullptr; +} + +eventbus::stream::EventStream* EventBus::obtainStream( + const eventbus::internal::event_id_t eventID, + eventbus::CreateStreamCallback createStreamCallback) +{ + std::lock_guard writeGuard{_mutexStreams}; + auto* found = findStreamUnsafe(eventID); + if(found != nullptr) + { + return found; + } + else + { + auto stream = createStreamCallback(); + _eventStreams.push_back(std::move(stream)); + _eventToStream[eventID] = _eventStreams.back().get(); + return _eventStreams.back().get(); + } +} + +bool EventBus::postponeEvent(eventbus::PostponeHelper& postponeCall) +{ + auto* eventStream = obtainStream(postponeCall.eventID, postponeCall.createStreamCallback); + eventStream->postpone(std::move(postponeCall.event)); + return true; +} + +eventbus::stream::EventStream* EventBus::listen(const std::uint32_t, + const eventbus::internal::event_id_t eventID, + eventbus::CreateStreamCallback createStreamCallback) +{ + auto* eventStream = obtainStream(eventID, createStreamCallback); + return eventStream; +} + +void EventBus::unlisten(const std::uint32_t listenerID, + const eventbus::internal::event_id_t eventID) +{ + auto* eventStream = findStream(eventID); + if(eventStream != nullptr) + { + eventStream->removeListener(listenerID); + } +} + +} // namespace dexode diff --git a/lib/src/dexode/EventBus.hpp b/lib/src/dexode/EventBus.hpp index 4c0af00..2d94ef3 100644 --- a/lib/src/dexode/EventBus.hpp +++ b/lib/src/dexode/EventBus.hpp @@ -1,100 +1,54 @@ +// +// Created by gelldur on 26.11.2019. +// #pragma once -#include -#include -#include +#include #include +#include +#include +#include -#include "dexode/eventbus/Listener.hpp" -#include "dexode/eventbus/internal/common.h" +#include "dexode/eventbus/Bus.hpp" namespace dexode { -template -class EventBus +class EventBus : public dexode::eventbus::Bus { template friend class dexode::eventbus::internal::ListenerAttorney; public: - using Listener = eventbus::Listener>; - - constexpr EventBus() = default; - ~EventBus() = default; - - EventBus(const EventBus&) = delete; - EventBus(EventBus&&) = delete; - - EventBus& operator=(EventBus&&) = delete; - EventBus& operator=(const EventBus&) = delete; - - template - constexpr void post(const Event& event) - { - static_assert(eventbus::internal::validateEvent(), "Invalid event"); - _base.template post(event); - } - - template - constexpr void postpone(Event event) - { - static_assert(eventbus::internal::validateEvent(), "Invalid event"); - _base.template postpone(std::move(event)); - } - - constexpr std::size_t processAll() + std::size_t process() override { return processLimit(std::numeric_limits::max()); } - constexpr std::size_t processLimit(const std::size_t maxCountOfEvents) - { - return _base.processLimit(maxCountOfEvents); - } + std::size_t processLimit(std::size_t limit); - [[nodiscard]] constexpr std::size_t getPostponeEventCount() const - { - return _base.getQueueEventCount(); - } +protected: + eventbus::stream::EventStream* obtainStream( + eventbus::internal::event_id_t eventID, + eventbus::CreateStreamCallback createStreamCallback); - Strategy& getStrategy() - { - return _base; - } + bool postponeEvent(eventbus::PostponeHelper& postponeCall) override; + eventbus::stream::EventStream* findStream(eventbus::internal::event_id_t eventID) const; + + void unlistenAll(std::uint32_t listenerID) override; + eventbus::stream::EventStream* listen( + std::uint32_t listenerID, + eventbus::internal::event_id_t eventID, + eventbus::CreateStreamCallback createStreamCallback) override; + void unlisten(std::uint32_t listenerID, eventbus::internal::event_id_t eventID) override; private: - std::atomic _lastID{0}; - Strategy _base; + mutable std::shared_mutex _mutexStreams; + std::shared_mutex _mutexProcess; + std::vector> _eventStreams; + std::map _eventToStream; - std::uint32_t newListenerID() - { - return ++_lastID; - } - - template - constexpr void listen(const std::uint32_t listenerID, - std::function&& callback) - { - static_assert(eventbus::internal::validateEvent(), "Invalid event"); - assert(callback && "callback should be valid"); // Check for valid object - - _base.template listen(listenerID, - std::forward>(callback)); - } - - constexpr void unlistenAll(const std::uint32_t listenerID) - { - _base.unlistenAll(listenerID); - } - - template - constexpr void unlisten(const std::uint32_t listenerID) - { - static_assert(eventbus::internal::validateEvent(), "Invalid event"); - const auto eventID = eventbus::internal::event_id; - _base.template unlisten(listenerID, eventID); - } + eventbus::stream::EventStream* findStreamUnsafe(eventbus::internal::event_id_t eventID) const; }; } // namespace dexode diff --git a/lib/src/dexode/eventbus/Bus.hpp b/lib/src/dexode/eventbus/Bus.hpp new file mode 100644 index 0000000..d72d3e1 --- /dev/null +++ b/lib/src/dexode/eventbus/Bus.hpp @@ -0,0 +1,126 @@ +// +// Created by gelldur on 26.11.2019. +// +#pragma once + +#include +#include +#include + +#include "dexode/eventbus/Listener.hpp" +#include "dexode/eventbus/internal/ListenerAttorney.hpp" +#include "dexode/eventbus/internal/event_id.hpp" +#include "dexode/eventbus/stream/ProtectedEventStream.hpp" + +namespace dexode::eventbus +{ + +class Bus; + +template +using DefaultEventStream = eventbus::stream::ProtectedEventStream; +using CreateStreamCallback = std::unique_ptr (*const)(); +using PostponeCallback = bool (*const)(Bus& bus, std::any event); + +template +bool postpone(Bus& bus, std::any event); + +template +std::unique_ptr createDefaultEventStream() +{ + return std::make_unique>(); +} + +class PostponeHelper +{ +public: + internal::event_id_t eventID = nullptr; + std::any event; + + PostponeCallback postponeCallback = nullptr; // function pointer + CreateStreamCallback createStreamCallback = nullptr; // function pointer + + PostponeHelper(const internal::event_id_t eventId, + std::any&& event, + PostponeCallback postponeCallback, + CreateStreamCallback createStreamCallback) + : eventID(eventId) + , event(std::forward(event)) + , postponeCallback(postponeCallback) + , createStreamCallback(createStreamCallback) + {} + + template + static PostponeHelper create(std::any&& event) + { + return PostponeHelper{internal::event_id(), + std::forward(event), + postpone, + createDefaultEventStream}; + } + + ~PostponeHelper() = default; +}; + +class Bus +{ + template + friend class dexode::eventbus::internal::ListenerAttorney; + +public: + using Listener = eventbus::Listener; + + Bus() = default; + virtual ~Bus() = default; + + virtual std::size_t process() = 0; + + template + bool postpone(Event event) + { + static_assert(internal::validateEvent(), "Invalid event"); + auto postponeCall = PostponeHelper::create(std::move(event)); + return postponeEvent(postponeCall); + } + +protected: + virtual bool postponeEvent(PostponeHelper& postponeCall) = 0; + virtual eventbus::stream::EventStream* listen(std::uint32_t listenerID, + internal::event_id_t eventID, + CreateStreamCallback createStreamCallback) = 0; + + virtual void unlistenAll(std::uint32_t listenerID) = 0; + virtual void unlisten(std::uint32_t listenerID, internal::event_id_t eventID) = 0; + +private: + std::atomic _lastID{0}; + + std::uint32_t newListenerID() + { + return ++_lastID; // used for generate unique listeners ID's + } + + template + void listen(const std::uint32_t listenerID, std::function&& callback) + { + static_assert(internal::validateEvent(), "Invalid event"); + assert(callback && "callback should be valid"); // Check for valid object + + constexpr auto eventID = internal::event_id(); + + auto* eventStream = listen(listenerID, eventID, createDefaultEventStream); + if(eventStream != nullptr) // maybe someone don't want add listener + { + eventStream->addListener(listenerID, + std::forward>(callback)); + } + } +}; + +template +bool postpone(Bus& bus, std::any event) +{ + return bus.postpone(std::move(std::any_cast(event))); +} + +} // namespace dexode::eventbus diff --git a/lib/src/dexode/eventbus/Listener.hpp b/lib/src/dexode/eventbus/Listener.hpp index a6d4097..ac646cf 100644 --- a/lib/src/dexode/eventbus/Listener.hpp +++ b/lib/src/dexode/eventbus/Listener.hpp @@ -5,6 +5,8 @@ #include #include "dexode/eventbus/internal/ListenerAttorney.hpp" +#include "dexode/eventbus/internal/event_id.hpp" +#include "dexode/eventbus/internal/listener_traits.hpp" namespace dexode::eventbus { @@ -28,8 +30,9 @@ public: Listener(const Listener& other) = delete; - Listener(Listener&& other) - : _id(other._id) + Listener(Listener&& other) noexcept + : _id(other._id) // we don't have to reset listener ID as _bus is moved and we won't call + // unlistenAll , _bus(std::move(other._bus)) {} @@ -54,15 +57,35 @@ public: { unlistenAll(); // remove previous } + // we don't have reset listener ID as bus is moved and we won't call unlistenAll _id = other._id; _bus = std::move(other._bus); return *this; } - template - void listen(std::function&& callback) + template + constexpr void listen(std::function&& callback) { + static_assert(internal::validateEvent(), "Invalid event"); + listenToCallback(std::forward>(callback)); + } + + template > + constexpr void listen(EventCallback&& callback) + { + static_assert(std::is_const_v>, "Event should be const"); + static_assert(std::is_reference_v, "Event should be const & (reference)"); + using PureEvent = std::remove_const_t>; + static_assert(internal::validateEvent(), "Invalid event"); + + listenToCallback(std::forward(callback)); + } + + template + void listenToCallback(std::function&& callback) + { + static_assert(internal::validateEvent(), "Invalid event"); if(_bus == nullptr) { throw std::runtime_error{"bus is null"}; @@ -72,6 +95,18 @@ public: *_bus, _id, std::forward>(callback)); } + template + void listenToCallback(const std::function& callback) + { + static_assert(internal::validateEvent(), "Invalid event"); + if(_bus == nullptr) + { + throw std::runtime_error{"bus is null"}; + } + internal::ListenerAttorney::template listen( + *_bus, _id, std::function{callback}); + } + void unlistenAll() { if(_bus == nullptr) @@ -84,11 +119,12 @@ public: template void unlisten() { + static_assert(internal::validateEvent(), "Invalid event"); if(_bus == nullptr) { throw std::runtime_error{"bus is null"}; } - internal::ListenerAttorney::template unlisten(*_bus, _id); + internal::ListenerAttorney::unlisten(*_bus, _id, internal::event_id()); } private: diff --git a/lib/src/dexode/eventbus/TagEventBus.hpp b/lib/src/dexode/eventbus/TagEventBus.hpp deleted file mode 100644 index 044221f..0000000 --- a/lib/src/dexode/eventbus/TagEventBus.hpp +++ /dev/null @@ -1,121 +0,0 @@ -// -// Created by gelldur on 30.10.2019. -// -#pragma once - -#include -#include -#include - -#include "dexode/EventBus.hpp" - -namespace dexode::eventbus -{ - -template -class TagEventBus -{ - template - friend class dexode::eventbus::internal::ListenerAttorney; - - using EventBus_t = EventBus; - -public: - using ListenerAll = eventbus::Listener>; - using Listener = typename EventBus::Listener; // alias - - TagEventBus(const std::vector& tags) - { - for(const auto& tag : tags) - { - _tagToBus.emplace(tag, std::make_shared()); - } - } - - ~TagEventBus() = default; - - TagEventBus(const TagEventBus&) = delete; - TagEventBus(TagEventBus&&) = delete; - - TagEventBus& operator=(TagEventBus&&) = delete; - TagEventBus& operator=(const TagEventBus&) = delete; - - template - void post(const Event& event) - { - _allBus.post(event); - for(auto& element : _tagToBus) - { - element.second->post(event); - } - } - - template - void postpone(Event event) - { - _allBus.postpone(event); - for(auto& element : _tagToBus) - { - element.second->postpone(event); - } - } - - template - void post(const std::string& tag, const Event& event) - { - _allBus.post(event); - _tagToBus.at(tag)->post(event); - } - - template - void postpone(const std::string& tag, Event event) - { - _allBus.postpone(event); - _tagToBus.at(tag)->postpone(event); - } - - constexpr std::size_t processAll() - { - return processLimit(std::numeric_limits::max()); - } - - constexpr std::size_t processLimit(const std::size_t maxCountOfEvents) - { - return _allBus.processLimit(maxCountOfEvents); - } - - const std::shared_ptr>& get(const std::string& tag) - { - return _tagToBus.at(tag); - } - -private: - EventBus_t _allBus; - std::map> _tagToBus; - - constexpr std::uint32_t newListenerID() - { - return internal::ListenerAttorney::newListenerID(_allBus); - } - - template - constexpr void listen(const std::uint32_t listenerID, - std::function callback) - { - internal::ListenerAttorney::template listen( - _allBus, listenerID, std::move(callback)); - } - - constexpr void unlistenAll(const std::uint32_t listenerID) - { - internal::ListenerAttorney::unlistenAll(_allBus, listenerID); - } - - template - constexpr void unlisten(const std::uint32_t listenerID) - { - internal::ListenerAttorney::template unlisten(_allBus, listenerID); - } -}; - -} // namespace dexode::eventbus diff --git a/lib/src/dexode/eventbus/internal/AsyncCallbackVector.h b/lib/src/dexode/eventbus/internal/AsyncCallbackVector.h deleted file mode 100644 index 5a36dad..0000000 --- a/lib/src/dexode/eventbus/internal/AsyncCallbackVector.h +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "CallbackVector.h" - -namespace dexode::eventbus::internal -{ - -template -struct AsyncCallbackVector : public CallbackVector -{ - using CallbackType = std::function; - using ContainerElement = std::pair; - std::vector container; - - virtual void remove(const int token) override - { - auto removeFrom = std::remove_if( - container.begin(), container.end(), [token](const ContainerElement& element) { - return element.first == token; - }); - if(removeFrom != container.end()) - { - container.erase(removeFrom, container.end()); - } - } - - void add(const int token, CallbackType callback) - { - container.emplace_back(token, std::move(callback)); - } -}; - -} // namespace dexode::eventbus::internal diff --git a/lib/src/dexode/eventbus/internal/CallbackVector.h b/lib/src/dexode/eventbus/internal/CallbackVector.h deleted file mode 100644 index 24ebcd3..0000000 --- a/lib/src/dexode/eventbus/internal/CallbackVector.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -namespace dexode::eventbus::internal -{ - -struct CallbackVector -{ - virtual ~CallbackVector() = default; - - virtual void remove(const int token) = 0; -}; - -} // namespace dexode::eventbus::internal diff --git a/lib/src/dexode/eventbus/internal/ListenerAttorney.hpp b/lib/src/dexode/eventbus/internal/ListenerAttorney.hpp index 7a27618..1fc2d74 100644 --- a/lib/src/dexode/eventbus/internal/ListenerAttorney.hpp +++ b/lib/src/dexode/eventbus/internal/ListenerAttorney.hpp @@ -3,15 +3,15 @@ // #pragma once -//#include "dexode/EventBus.hpp" +#include + +#include "dexode/eventbus/internal/event_id.hpp" namespace dexode::eventbus { template class Listener; -template -class TagEventBus; } // namespace dexode::eventbus namespace dexode::eventbus::internal @@ -23,9 +23,6 @@ class ListenerAttorney template friend class dexode::eventbus::Listener; - template - friend class eventbus::TagEventBus; - private: static constexpr std::uint32_t newListenerID(EventBus_t& bus) { @@ -46,10 +43,11 @@ private: bus.unlistenAll(listenerID); } - template - static constexpr void unlisten(EventBus_t& bus, const std::uint32_t listenerID) + static constexpr void unlisten(EventBus_t& bus, + const std::uint32_t listenerID, + const event_id_t eventID) { - bus.template unlisten(listenerID); + bus.unlisten(listenerID, eventID); } }; diff --git a/lib/src/dexode/eventbus/internal/TransactionCallbackVector.h b/lib/src/dexode/eventbus/internal/TransactionCallbackVector.h deleted file mode 100644 index adb9a67..0000000 --- a/lib/src/dexode/eventbus/internal/TransactionCallbackVector.h +++ /dev/null @@ -1,85 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "CallbackVector.h" - -namespace dexode::eventbus::internal -{ - -template -struct TransactionCallbackVector : public CallbackVector -{ - using CallbackType = std::function; - using ContainerElement = std::pair; - using ContainerType = std::vector; - ContainerType container; - ContainerType toAdd; - std::vector toRemove; - int inTransaction = 0; - - virtual void remove(const int token) override - { - if(inTransaction > 0) - { - toRemove.push_back(token); - return; - } - - // Invalidation rules: - // https://stackoverflow.com/questions/6438086/iterator-invalidation-rules - auto removeFrom = std::remove_if( - container.begin(), container.end(), [token](const ContainerElement& element) { - return element.first == token; - }); - if(removeFrom != container.end()) - { - container.erase(removeFrom, container.end()); - } - } - - void add(const int token, const CallbackType& callback) - { - if(inTransaction > 0) - { - toAdd.emplace_back(token, callback); - } - else - { - container.emplace_back(token, callback); - } - } - - void beginTransaction() - { - ++inTransaction; - } - - void commitTransaction() - { - --inTransaction; - if(inTransaction > 0) - { - return; - } - inTransaction = 0; - - if(toAdd.empty() == false) - { - container.insert(container.end(), toAdd.begin(), toAdd.end()); - toAdd.clear(); - } - if(toRemove.empty() == false) - { - for(auto token : toRemove) - { - remove(token); - } - toRemove.clear(); - } - } -}; - -} // namespace dexode::eventbus::internal diff --git a/lib/src/dexode/eventbus/internal/common.h b/lib/src/dexode/eventbus/internal/event_id.hpp similarity index 76% rename from lib/src/dexode/eventbus/internal/common.h rename to lib/src/dexode/eventbus/internal/event_id.hpp index 565e53b..e726c34 100644 --- a/lib/src/dexode/eventbus/internal/common.h +++ b/lib/src/dexode/eventbus/internal/event_id.hpp @@ -5,12 +5,21 @@ namespace dexode::eventbus::internal { -using event_id_t = std::size_t; +template +struct type_id_ptr +{ + static const T* const id; +}; + +template +const T* const type_id_ptr::id = nullptr; + +using event_id_t = const void*; template constexpr event_id_t event_id() // Helper for getting "type id" { - return typeid(T).hash_code(); + return &type_id_ptr::id; } template diff --git a/lib/src/dexode/eventbus/internal/listener_traits.hpp b/lib/src/dexode/eventbus/internal/listener_traits.hpp new file mode 100644 index 0000000..12c374b --- /dev/null +++ b/lib/src/dexode/eventbus/internal/listener_traits.hpp @@ -0,0 +1,27 @@ +// +// Created by gelldur on 22.12.2019. +// +#pragma once + +#include +#include + +namespace dexode::eventbus::internal +{ + +template +Arg first_argument_helper(Ret (*)(Arg, Rest...)); + +template +Arg first_argument_helper(Ret (F::*)(Arg, Rest...)); + +template +Arg first_argument_helper(Ret (F::*)(Arg, Rest...) const); + +template +decltype(first_argument_helper(&F::operator())) first_argument_helper(F); + +template +using first_argument = decltype(first_argument_helper(std::declval())); + +} // namespace dexode::eventbus::internal diff --git a/lib/src/dexode/eventbus/perk/PassPerk.cpp b/lib/src/dexode/eventbus/perk/PassPerk.cpp new file mode 100644 index 0000000..26fb418 --- /dev/null +++ b/lib/src/dexode/eventbus/perk/PassPerk.cpp @@ -0,0 +1,17 @@ +// +// Created by gelldur on 24.12.2019. +// +#include "PassPerk.hpp" + +#include "dexode/eventbus/Bus.hpp" + +namespace dexode::eventbus::perk +{ + +Flag PassEverythingPerk::onPrePostponeEvent(PostponeHelper& postponeCall) +{ + postponeCall.postponeCallback(*_passToBus, std::move(postponeCall.event)); + return Flag::postpone_cancel; +} + +} // namespace dexode::eventbus::perk diff --git a/lib/src/dexode/eventbus/perk/PassPerk.hpp b/lib/src/dexode/eventbus/perk/PassPerk.hpp new file mode 100644 index 0000000..02c478c --- /dev/null +++ b/lib/src/dexode/eventbus/perk/PassPerk.hpp @@ -0,0 +1,32 @@ +// +// Created by gelldur on 24.12.2019. +// +#pragma once + +#include + +#include "Perk.hpp" + +namespace dexode::eventbus +{ +class PostponeHelper; +class Bus; +} // namespace dexode::eventbus + +namespace dexode::eventbus::perk +{ + +class PassEverythingPerk : public Perk +{ +public: + PassEverythingPerk(std::shared_ptr passTo) + : _passToBus{std::move(passTo)} + {} + + Flag onPrePostponeEvent(PostponeHelper& postponeCall); + +private: + std::shared_ptr _passToBus; +}; + +} // namespace dexode::eventbus::perk diff --git a/lib/src/dexode/eventbus/perk/Perk.cpp b/lib/src/dexode/eventbus/perk/Perk.cpp new file mode 100644 index 0000000..c9619f2 --- /dev/null +++ b/lib/src/dexode/eventbus/perk/Perk.cpp @@ -0,0 +1,9 @@ +// +// Created by gelldur on 23.12.2019. +// +#include "Perk.hpp" + +namespace dexode::eventbus::perk +{ + +} // namespace dexode::eventbus::perk diff --git a/lib/src/dexode/eventbus/perk/Perk.hpp b/lib/src/dexode/eventbus/perk/Perk.hpp new file mode 100644 index 0000000..1ab7143 --- /dev/null +++ b/lib/src/dexode/eventbus/perk/Perk.hpp @@ -0,0 +1,22 @@ +// +// Created by gelldur on 23.12.2019. +// +#pragma once + +namespace dexode::eventbus::perk +{ + +enum class Flag : int +{ + nop, + postpone_cancel, + postpone_continue, +}; + +class Perk +{ +public: + virtual ~Perk() = default; +}; + +} // namespace dexode::eventbus::perk diff --git a/lib/src/dexode/eventbus/perk/PerkEventBus.cpp b/lib/src/dexode/eventbus/perk/PerkEventBus.cpp new file mode 100644 index 0000000..afe605d --- /dev/null +++ b/lib/src/dexode/eventbus/perk/PerkEventBus.cpp @@ -0,0 +1,39 @@ +// +// Created by gelldur on 23.12.2019. +// +#include "PerkEventBus.hpp" + +namespace dexode::eventbus::perk +{ + +PerkEventBus::RegisterHelper PerkEventBus::addPerk(std::unique_ptr perk) +{ + auto* local = perk.get(); + _perks.push_back(std::move(perk)); + return RegisterHelper(this, local); +} + +bool PerkEventBus::postponeEvent(PostponeHelper& postponeCall) +{ + for(const auto& onPrePostpone : _onPrePostpone) + { + if(onPrePostpone(postponeCall) == perk::Flag::postpone_cancel) + { + return false; + } + } + if(EventBus::postponeEvent(postponeCall)) + { + for(const auto& onPostPostpone : _onPostPostpone) + { + if(onPostPostpone(postponeCall) == perk::Flag::postpone_cancel) + { + break; + } + } + return true; + } + return false; +} + +} // namespace dexode::eventbus::perk diff --git a/lib/src/dexode/eventbus/perk/PerkEventBus.hpp b/lib/src/dexode/eventbus/perk/PerkEventBus.hpp new file mode 100644 index 0000000..f3f4499 --- /dev/null +++ b/lib/src/dexode/eventbus/perk/PerkEventBus.hpp @@ -0,0 +1,74 @@ +// +// Created by gelldur on 23.12.2019. +// +#pragma once + +#include +#include + +#include "Perk.hpp" +#include "dexode/EventBus.hpp" + +namespace dexode::eventbus::perk +{ + +class PerkEventBus : public EventBus +{ +public: + class RegisterHelper + { + friend PerkEventBus; + + public: + template + RegisterHelper& registerPrePostpone(perk::Flag (Perk_t::*method)(PostponeHelper&)) + { + _bus->_onPrePostpone.push_back( + std::bind(method, static_cast(_perk), std::placeholders::_1)); + return *this; + } + + template + RegisterHelper& registerPostPostpone(perk::Flag (Perk_t::*method)(PostponeHelper&)) + { + _bus->_onPostPostpone.push_back( + std::bind(method, static_cast(_perk), std::placeholders::_1)); + return *this; + } + + private: + PerkEventBus* _bus; + Perk* _perk; + + RegisterHelper(PerkEventBus* bus, Perk* perk) + : _bus(bus) + , _perk(perk) + {} + }; + + RegisterHelper addPerk(std::unique_ptr perk); + + template + T* getPerk() + { + auto found = + std::find_if(_perks.begin(), _perks.end(), [](const std::unique_ptr& perk) { + return dynamic_cast(perk.get()) != nullptr; + }); + if(found != _perks.end()) + { + return static_cast(found->get()); + } + return nullptr; + } + +protected: + bool postponeEvent(PostponeHelper& postponeCall) override; + +private: + std::vector> _perks; + std::vector> _onPrePostpone; + std::vector> _onPostPostpone; +}; + +} // namespace dexode::eventbus::perk diff --git a/lib/src/dexode/eventbus/perk/TagPerk.cpp b/lib/src/dexode/eventbus/perk/TagPerk.cpp new file mode 100644 index 0000000..dfa253a --- /dev/null +++ b/lib/src/dexode/eventbus/perk/TagPerk.cpp @@ -0,0 +1,20 @@ +// +// Created by gelldur on 24.12.2019. +// +#include "TagPerk.hpp" + +namespace dexode::eventbus::perk +{ + +Flag TagPerk::onPrePostponeEvent(PostponeHelper& postponeCall) +{ + if(auto found = _eventsToWrap.find(postponeCall.eventID); found != _eventsToWrap.end()) + { + found->second(postponeCall.event); + return Flag::postpone_cancel; + } + + return Flag::postpone_continue; +} + +} diff --git a/lib/src/dexode/eventbus/perk/TagPerk.hpp b/lib/src/dexode/eventbus/perk/TagPerk.hpp new file mode 100644 index 0000000..fe6410b --- /dev/null +++ b/lib/src/dexode/eventbus/perk/TagPerk.hpp @@ -0,0 +1,48 @@ +// +// Created by gelldur on 24.12.2019. +// +#pragma once + +#include +#include +#include +#include + +#include "Perk.hpp" +#include "dexode/eventbus/Bus.hpp" +#include "dexode/eventbus/internal/event_id.hpp" + +namespace dexode::eventbus::perk +{ + +class TagPerk : public Perk +{ +public: + TagPerk(std::string tag, dexode::eventbus::Bus* owner) + : _tag{std::move(tag)} + , _ownerBus{owner} + {} + + Flag onPrePostponeEvent(PostponeHelper& postponeCall); + + template + TagPerk& wrapTag() + { + static_assert(internal::validateEvent(), "Invalid tag event"); + static_assert(internal::validateEvent(), "Invalid event"); + constexpr auto eventID = internal::event_id(); + + _eventsToWrap[eventID] = [this](std::any event) { + TagEvent newEvent{_tag, std::move(std::any_cast(event))}; + _ownerBus->postpone(std::move(newEvent)); + }; + return *this; + } + +private: + std::map> _eventsToWrap; + std::string _tag; + dexode::eventbus::Bus* _ownerBus; +}; + +} // namespace dexode::eventbus::perk diff --git a/lib/src/dexode/eventbus/perk/WaitPerk.cpp b/lib/src/dexode/eventbus/perk/WaitPerk.cpp new file mode 100644 index 0000000..7114a33 --- /dev/null +++ b/lib/src/dexode/eventbus/perk/WaitPerk.cpp @@ -0,0 +1,35 @@ +// +// Created by gelldur on 24.12.2019. +// +#include "WaitPerk.hpp" + +namespace dexode::eventbus::perk +{ + +bool WaitPerk::wait() +{ + using namespace std::chrono_literals; + std::unique_lock lock(_waitMutex); + _eventWaiting.wait(lock); + + return true; +} + +bool WaitPerk::waitFor(const std::chrono::milliseconds timeout) +{ + using namespace std::chrono_literals; + std::unique_lock lock(_waitMutex); + if(_eventWaiting.wait_for(lock, timeout) == std::cv_status::timeout) + { + return false; + } + return true; +} + +Flag WaitPerk::onPostponeEvent(PostponeHelper&) +{ + _eventWaiting.notify_one(); + return Flag::postpone_continue; +} + +} // namespace dexode::eventbus::perk diff --git a/lib/src/dexode/eventbus/perk/WaitPerk.hpp b/lib/src/dexode/eventbus/perk/WaitPerk.hpp new file mode 100644 index 0000000..c3f3efc --- /dev/null +++ b/lib/src/dexode/eventbus/perk/WaitPerk.hpp @@ -0,0 +1,33 @@ +// +// Created by gelldur on 24.12.2019. +// +#pragma once + +#include +#include +#include + +#include "Perk.hpp" + +namespace dexode::eventbus +{ +class PostponeHelper; +} + +namespace dexode::eventbus::perk +{ + +class WaitPerk : public Perk +{ +public: + bool wait(); + bool waitFor(std::chrono::milliseconds timeout); + + Flag onPostponeEvent(PostponeHelper& postponeCall); + +private: + std::condition_variable _eventWaiting; + std::mutex _waitMutex; +}; + +} // namespace dexode::eventbus::perk diff --git a/lib/src/dexode/eventbus/permission/PostponeBus.hpp b/lib/src/dexode/eventbus/permission/PostponeBus.hpp new file mode 100644 index 0000000..54e51a9 --- /dev/null +++ b/lib/src/dexode/eventbus/permission/PostponeBus.hpp @@ -0,0 +1,35 @@ +// +// Created by gelldur on 24.12.2019. +// +#pragma once + +#include + +#include "dexode/eventbus/Bus.hpp" + +namespace dexode::eventbus::permission +{ +/** + * Intention of this helper is to hide API of other bus but allow only to postpone events + * So no: + * - listening + * - processing + */ +class PostponeBus +{ +public: + PostponeBus(std::shared_ptr hideBus) + : _hideBus{std::move(hideBus)} + {} + + template + constexpr bool postpone(Event event) + { + return _hideBus->postpone(event); + } + +private: + std::shared_ptr _hideBus; +}; + +} // namespace dexode::eventbus::permission diff --git a/lib/src/dexode/eventbus/strategy/Protected.cpp b/lib/src/dexode/eventbus/strategy/Protected.cpp deleted file mode 100644 index 0fec885..0000000 --- a/lib/src/dexode/eventbus/strategy/Protected.cpp +++ /dev/null @@ -1,71 +0,0 @@ -#include "dexode/eventbus/strategy/Protected.hpp" - -namespace dexode::eventbus::strategy -{ - -std::size_t Protected::processLimit(const std::size_t maxCountOfEvents) -{ - std::size_t processed = 0; - std::function postponeEventCallback; - - while(processed < maxCountOfEvents) - { - { - std::unique_lock writeLock{_mutex}; - if(_eventQueue.empty()) - { - break; - } - postponeEventCallback = std::move(_eventQueue.front()); - _eventQueue.pop_front(); - } - postponeEventCallback(); - ++processed; - } - return processed; -} - -void Protected::unlistenAll(const std::uint32_t listenerID) -{ - std::unique_lock writeLock{_mutex}; // TODO locking already locked mutex - for(auto& element : _callbacks) - { - element.second->remove(listenerID); - } -} - -bool Protected::wait() -{ - // Extra check before wait because maybe we already have something in queue - if(hasEvents()) - { - return true; - } - using namespace std::chrono_literals; - std::unique_lock lock(_waitMutex); - _eventWaiting.wait(lock); - - return hasEvents(); -} - -bool Protected::waitFor(std::chrono::milliseconds timeout) -{ - // Extra check before wait because maybe we already have something in queue - if(hasEvents()) - { - return true; - } - using namespace std::chrono_literals; - std::unique_lock lock(_waitMutex); - _eventWaiting.wait_for(lock, timeout); - - return hasEvents(); -} - -bool Protected::hasEvents() const -{ - std::shared_lock readLock{_mutex}; - return not _eventQueue.empty(); -} - -} // namespace dexode::eventbus::strategy diff --git a/lib/src/dexode/eventbus/strategy/Protected.hpp b/lib/src/dexode/eventbus/strategy/Protected.hpp deleted file mode 100644 index 8a9a0d5..0000000 --- a/lib/src/dexode/eventbus/strategy/Protected.hpp +++ /dev/null @@ -1,115 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#include "dexode/EventBus.hpp" -#include "dexode/eventbus/internal/AsyncCallbackVector.h" - -namespace dexode::eventbus::strategy -{ - -class Protected -{ -public: - Protected() = default; - ~Protected() = default; - - Protected(const Protected&) = delete; - Protected(Protected&&) = delete; - - Protected& operator=(Protected&&) = delete; - Protected& operator=(const Protected&) = delete; - - template - void post(const Event& event) - { - std::shared_lock readLock{_mutex}; - - using Vector = eventbus::internal::AsyncCallbackVector; - auto found = _callbacks.find(eventbus::internal::event_id()); - if(found == _callbacks.end()) - { - return; // no such notifications - } - - std::unique_ptr& vector = found->second; - assert(dynamic_cast(vector.get())); - auto* callbacks = static_cast(vector.get()); - - for(const auto& element : callbacks->container) - { - element.second(event); - } - } - - template - void postpone(Event&& event) - { - { - std::unique_lock writeLock{_mutex}; - _eventQueue.push_back( - [this, event = std::forward(event)]() { post(event); }); - } - _eventWaiting.notify_one(); - } - - std::size_t processLimit(std::size_t maxCountOfEvents); - - std::size_t getPostponeEventCount() const - { - std::shared_lock lock{_mutex}; - return _eventQueue.size(); - } - - bool wait(); - bool waitFor(std::chrono::milliseconds timeout); - - template - void listen(const std::uint32_t listenerID, std::function&& callback) - { - using Vector = eventbus::internal::AsyncCallbackVector; - - std::unique_lock writeLock{_mutex}; - auto eventListeners = _callbacks.find(eventbus::internal::event_id()); - if(eventListeners == _callbacks.cend()) - { - eventListeners = _callbacks.emplace_hint( - eventListeners, eventbus::internal::event_id(), std::make_unique()); - } - assert(dynamic_cast(eventListeners->second.get())); - auto* vectorImpl = static_cast(eventListeners->second.get()); - vectorImpl->add(listenerID, std::forward>(callback)); - } - - void unlistenAll(std::uint32_t listenerID); - - template - void unlisten(const std::uint32_t listenerID) - { - std::unique_lock writeLock{_mutex}; // TODO locking already locked mutex - auto found = _callbacks.find(eventbus::internal::event_id()); - if(found != _callbacks.end()) - { - found->second->remove(listenerID); - } - } - -private: - std::map> - _callbacks; - mutable std::shared_mutex _mutex; - - std::mutex _waitMutex; - std::condition_variable _eventWaiting; - std::deque> _eventQueue; - - bool hasEvents() const; -}; - -} // namespace dexode::eventbus::strategy diff --git a/lib/src/dexode/eventbus/strategy/Transaction.hpp b/lib/src/dexode/eventbus/strategy/Transaction.hpp deleted file mode 100644 index 376b853..0000000 --- a/lib/src/dexode/eventbus/strategy/Transaction.hpp +++ /dev/null @@ -1,115 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "dexode/EventBus.hpp" -#include "dexode/eventbus/internal/TransactionCallbackVector.h" - -namespace dexode::eventbus::strategy -{ - -class Transaction -{ -public: - Transaction() = default; - ~Transaction() = default; - - Transaction(const Transaction&) = delete; - Transaction(Transaction&&) = delete; - - Transaction& operator=(Transaction&&) = delete; - Transaction& operator=(const Transaction&) = delete; - - template - void post(const Event& event) - { - using Vector = eventbus::internal::TransactionCallbackVector; - auto found = _callbacks.find(eventbus::internal::event_id()); - if(found == _callbacks.end()) - { - return; // no such notifications - } - - std::unique_ptr& vector = found->second; - assert(dynamic_cast(vector.get())); - auto* vectorImpl = static_cast(vector.get()); - - vectorImpl->beginTransaction(); - for(const auto& element : vectorImpl->container) - { - element.second(event); - } - vectorImpl->commitTransaction(); - } - - template - void postpone(Event&& event) - { - _eventQueue.push_back([this, event = std::forward(event)]() { post(event); }); - } - - std::size_t processLimit(const std::size_t maxCountOfEvents) - { - std::size_t processed = 0; - while(processed < maxCountOfEvents && not _eventQueue.empty()) - { - auto asyncCallback = std::move(_eventQueue.front()); - _eventQueue.pop_front(); - // Needs to be done in this way. Think about recursion in this case... - asyncCallback(); - ++processed; - } - return processed; - } - - [[nodiscard]] std::size_t getQueueEventCount() const noexcept - { - return _eventQueue.size(); - } - - template - void listen(const std::uint32_t listenerID, std::function&& callback) - { - using Vector = eventbus::internal::TransactionCallbackVector; - - std::unique_ptr& vector = - _callbacks[eventbus::internal::event_id()]; - if(vector == nullptr) - { - vector = std::make_unique(); - } - assert(dynamic_cast(vector.get())); - auto* vectorImpl = static_cast(vector.get()); - vectorImpl->add(listenerID, std::forward>(callback)); - } - - void unlistenAll(const std::uint32_t listenerID) - { - for(auto& element : _callbacks) - { - element.second->remove(listenerID); - } - } - - template - void unlisten(const std::uint32_t listenerID) - { - static_assert(eventbus::internal::validateEvent(), "Invalid event"); - - auto found = _callbacks.find(eventbus::internal::event_id()); - if(found != _callbacks.end()) - { - found->second->remove(listenerID); - } - } - -private: - std::map> - _callbacks; - std::deque> _eventQueue; -}; - -} // namespace dexode::eventbus::strategy diff --git a/lib/src/dexode/eventbus/stream/EventStream.hpp b/lib/src/dexode/eventbus/stream/EventStream.hpp new file mode 100644 index 0000000..e85b4e2 --- /dev/null +++ b/lib/src/dexode/eventbus/stream/EventStream.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include +#include + +namespace dexode::eventbus::stream +{ + +class EventStream +{ +public: + virtual ~EventStream() = default; + + virtual void postpone(std::any event) = 0; + virtual std::size_t process(std::size_t limit) = 0; + + virtual bool addListener(std::uint32_t listenerID, std::any callback) = 0; + virtual bool removeListener(std::uint32_t listenerID) = 0; +}; + +class NoopEventStream : public EventStream +{ +public: + void postpone(std::any event) override + { + throw std::runtime_error{"Noop"}; + } + size_t process(std::size_t limit) override + { + throw std::runtime_error{"Noop"}; + } + bool addListener(std::uint32_t listenerID, std::any callback) override + { + throw std::runtime_error{"Noop"}; + } + bool removeListener(std::uint32_t listenerID) override + { + throw std::runtime_error{"Noop"}; + } +}; + +} // namespace dexode::eventbus::stream diff --git a/lib/src/dexode/eventbus/stream/ProtectedEventStream.hpp b/lib/src/dexode/eventbus/stream/ProtectedEventStream.hpp new file mode 100644 index 0000000..98fd46b --- /dev/null +++ b/lib/src/dexode/eventbus/stream/ProtectedEventStream.hpp @@ -0,0 +1,163 @@ +#pragma once + +#include +#include +#include +#include + +#include "dexode/eventbus/stream/EventStream.hpp" + +namespace dexode::eventbus::stream +{ + +template +class ProtectedEventStream : public EventStream +{ + using Callback = std::function; + +public: + void postpone(std::any event) override + { + auto myEvent = std::any_cast(event); + std::lock_guard writeGuard{_mutexEvent}; + _queue.push_back(std::move(myEvent)); + } + + std::size_t process(const std::size_t limit) override + { + std::vector processEvents; + { + std::lock_guard writeGuard{_mutexEvent}; + if(limit >= _queue.size()) + { + processEvents.reserve(_queue.size()); + std::swap(processEvents, _queue); + } + else + { + const auto countElements = std::min(limit, _queue.size()); + processEvents.reserve(countElements); + std::move(_queue.begin(), + std::next(_queue.begin(), countElements), + std::back_inserter(processEvents)); + } + } + + for(const auto& event : processEvents) + { + // At this point we need to consider transaction safety as during some notification + // we can add/remove listeners + _isProcessing = true; + for(auto& callback : _callbacks) + { + callback(event); + } + _isProcessing = false; + + flushWaitingOnes(); + } + + return processEvents.size(); + } + + bool addListener(const std::uint32_t listenerID, std::any callback) override + { + std::lock_guard writeGuard{_mutexCallbacks}; + auto myCallback = std::any_cast(callback); + if(_isProcessing) + { + _waiting.emplace_back(listenerID, std::move(myCallback)); + return true; + } + + return rawAddListener(listenerID, std::move(myCallback)); + } + + bool removeListener(const std::uint32_t listenerID) override + { + std::lock_guard writeGuard{_mutexCallbacks}; + if(_isProcessing) + { + _waiting.emplace_back(listenerID, Callback{}); + return true; + } + + return rawRemoveListener(listenerID); + } + + [[nodiscard]] bool hasEvents() const + { + std::shared_lock readGuard{_mutexEvent}; + return not _queue.empty(); + } + +private: + std::vector _listenerIDs; + std::vector _queue; + std::vector _callbacks; + + std::atomic _isProcessing{false}; + std::vector> _waiting; + + std::shared_mutex _mutexEvent; + std::shared_mutex _mutexCallbacks; + + void flushWaitingOnes() + { + assert(_isProcessing == false); + std::lock_guard writeGuard{_mutexCallbacks}; + if(_waiting.empty()) + { + return; + } + + for(auto&& element : _waiting) + { + if(element.second) // if callable it means we want to add + { + rawAddListener(element.first, std::move(element.second)); + } + else // if not callable we want to remove + { + rawRemoveListener(element.first); + } + } + _waiting.clear(); + } + + bool rawAddListener(const std::uint32_t listenerID, Callback&& callback) + { + auto found = std::find(_listenerIDs.begin(), _listenerIDs.end(), listenerID); + if(found != _listenerIDs.end()) + { + /// ###### IMPORTANT ###### + /// This exception has some reason. + /// User should use multiple listeners instead of one. Thanks to that it makes + /// it more clear what will happen when call unlisten with specific Event + throw std::invalid_argument{std::string{"Already added listener for event: "} + + typeid(Event).name()}; + } + + _callbacks.push_back(std::forward(callback)); + _listenerIDs.push_back(listenerID); + assert(_listenerIDs.size() == _callbacks.size()); + return true; + } + + bool rawRemoveListener(const std::uint32_t listenerID) + { + auto found = std::find(_listenerIDs.begin(), _listenerIDs.end(), listenerID); + if(found == _listenerIDs.end()) + { + return false; + } + const auto index = std::distance(_listenerIDs.begin(), found); + + _listenerIDs.erase(found); + _callbacks.erase(std::next(_callbacks.begin(), index)); + assert(_listenerIDs.size() == _callbacks.size()); + return true; + } +}; + +} // namespace dexode::eventbus::stream diff --git a/lib/src/eventbus/AsyncEventBus.cpp b/lib/src/eventbus/AsyncEventBus.cpp deleted file mode 100644 index 10a18ba..0000000 --- a/lib/src/eventbus/AsyncEventBus.cpp +++ /dev/null @@ -1,55 +0,0 @@ -#include - -namespace Dexode -{ - -std::size_t AsyncEventBus::processCommandsAndGetQueuedEventsCount() -{ - std::lock_guard guard {_eventMutex}; - while(_commandsQueue.empty() == false) - { - _commandsQueue - .front()(); //This can't add any extra commands, because in this queue we story only listen/unlisten stuff - _commandsQueue.pop_front(); - } - //Yeah we want to return events count. So don't have to call getQueueEventCount - return _eventQueue.size(); -} - -int AsyncEventBus::consume(int max) -{ - int consumed = 0; - - std::function eventCommand; - while(processCommandsAndGetQueuedEventsCount() > 0 && consumed < max) //order is important - { - { - std::lock_guard guard {_eventMutex}; - eventCommand = std::move(_eventQueue.front()); - _eventQueue.pop_front(); - } - - eventCommand(); - ++consumed; - } - - return consumed; -} - -bool AsyncEventBus::wait() -{ - using namespace std::chrono_literals; - std::unique_lock lock(_waitMutex); - _eventWaiting.wait(lock); - return not _eventQueue.empty(); -} -bool AsyncEventBus::waitFor(std::chrono::milliseconds timeout) -{ - using namespace std::chrono_literals; - std::unique_lock lock(_waitMutex); - _eventWaiting.wait_for(lock, timeout); - - return not _eventQueue.empty(); -} - -} // namespace Dexode diff --git a/performance/src/AsyncEventBusPerformance.cpp b/performance/src/AsyncEventBusPerformance.cpp new file mode 100644 index 0000000..22682e7 --- /dev/null +++ b/performance/src/AsyncEventBusPerformance.cpp @@ -0,0 +1,54 @@ +// +// Created by gelldur on 14.06.19. +// +#include +#include +#include +#include + +#include + +#include +#include + +namespace +{ +struct SimpleEvent +{ + std::int64_t value = 0; +}; + +Dexode::AsyncEventBus bus; + +} // namespace + +void checkFor(benchmark::State& state) +{ + if(state.thread_index == 0) + { + Dexode::TokenHolder listener {&bus}; + std::uint64_t consumed = 0; + listener.listen( + [&consumed](const auto& event) { benchmark::DoNotOptimize(consumed += 1); }); + + for(auto _ : state) + { + //if(bus.wait()) + { + bus.consume(); + } + } + state.counters["consumed"] = consumed; + } + else + { + for(auto _ : state) + { + bus.schedule(SimpleEvent {std::chrono::steady_clock::now().time_since_epoch().count()}); + } + } +} + +BENCHMARK(checkFor)->Threads(2)->MinTime(1)->MeasureProcessCPUTime(); +BENCHMARK(checkFor)->Threads(5)->MinTime(1)->MeasureProcessCPUTime(); +BENCHMARK(checkFor)->Threads(10)->MinTime(1)->MeasureProcessCPUTime(); diff --git a/sample/CMakeLists.txt b/sample/CMakeLists.txt deleted file mode 100644 index 84c55ce..0000000 --- a/sample/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -cmake_minimum_required(VERSION 3.8 FATAL_ERROR) - -project(Sample LANGUAGES CXX) - -add_executable(Sample - src/main.cpp - ) - -if(NOT TARGET Dexode::EventBus) - find_package(EventBus CONFIG REQUIRED) -endif() - -target_link_libraries(Sample PRIVATE Dexode::EventBus) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e0a4959..598ff7b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -16,12 +16,14 @@ find_package(Threads REQUIRED) # Target definition add_executable(EventBusTest - src/AsyncEventBusTest.cpp - src/EventCollectorTest.cpp - src/EventIdTest.cpp - src/NotifierTest.cpp - src/TestTagEventBus.cpp + src/dexode/eventbus/test/event.hpp + src/dexode/eventbus/test/SuiteConcurrentEventBus.cpp + src/dexode/eventbus/test/SuiteEventBus.cpp + src/dexode/eventbus/test/SuiteEventID.cpp + src/dexode/eventbus/test/SuiteListener.cpp + src/main.cpp ) +target_include_directories(EventBusTest PRIVATE src/) target_compile_options(EventBusTest PUBLIC -Wall -pedantic diff --git a/test/src/AsyncEventBusTest.cpp b/test/src/AsyncEventBusTest.cpp deleted file mode 100644 index 2d20e54..0000000 --- a/test/src/AsyncEventBusTest.cpp +++ /dev/null @@ -1,146 +0,0 @@ -#include -#include -#include -#include - -#include -#include -#include - -using namespace std::chrono; - -const auto ns2 = std::chrono::nanoseconds{2}; // GCC 7 has some issues with 2ms :/ -const auto ns3 = std::chrono::nanoseconds{3}; - -using namespace dexode; -using ProtectedEventBus = EventBus; -using Listener = ProtectedEventBus::Listener; - -TEST_CASE("Should consume events in synchronous way When using AsyncEventBus", "[AsyncEventBus]") -{ - struct SimpleEvent - { - std::thread::id id; - }; - - ProtectedEventBus bus; - auto listener = Listener::createNotOwning(bus); - - int counter = 0; - - listener.listen([&counter](const SimpleEvent& event) { - std::cout << "Event from: " << event.id << std::endl; - ++counter; - }); - - std::thread worker1{[&bus]() { - for(int i = 0; i < 10; ++i) - { - bus.postpone(SimpleEvent{std::this_thread::get_id()}); - std::this_thread::sleep_for(ns3); - } - }}; - std::thread worker2{[&bus]() { - for(int i = 0; i < 10; ++i) - { - bus.postpone(SimpleEvent{std::this_thread::get_id()}); - std::this_thread::sleep_for(ns2); - } - }}; - - REQUIRE(counter == 0); - while(counter < 20) - { - static int sumOfConsumed = 0; - REQUIRE(counter == sumOfConsumed); - sumOfConsumed += bus.processAll(); - REQUIRE(counter == sumOfConsumed); - } - - worker1.join(); - worker2.join(); -} - -TEST_CASE("Should unlisten for event When call unlisten inside Listener", "[AsyncEventBus]") -{ - struct SimpleEvent - { - std::thread::id id; - }; - - ProtectedEventBus bus; - auto listener = Listener::createNotOwning(bus); - - int counter = 0; - - listener.listen([&counter](const SimpleEvent& event) { - std::cout << "Event from: " << event.id << std::endl; - ++counter; - // TODO FIXME - // bus.unlistenAll(myToken); // This doesn't mean that unlisten will be ASAP! - }); - - REQUIRE(counter == 0); - bus.postpone(SimpleEvent{std::this_thread::get_id()}); - // This should consume (listen request), SimpleEvent, ()unlisten request - REQUIRE(bus.processLimit(1) == 1); - - for(int i = 0; i < 10; ++i) - { - bus.processAll(); - bus.postpone(SimpleEvent{std::this_thread::get_id()}); - bus.processAll(); - } - - REQUIRE(counter == 1); // Should be called only once -} - -TEST_CASE("Should listen for only 1 event When call unlisten inside Listener", "[AsyncEventBus]") -{ - struct SimpleEvent - { - std::thread::id id; - }; - - ProtectedEventBus bus; - auto listener = Listener::createNotOwning(bus); - - int counter = 0; - - listener.listen([&counter](const SimpleEvent& event) { - std::cout << "Event from: " << event.id << std::endl; - ++counter; - // TODO FIXME - // bus.unlistenAll(myToken); // This doesn't mean that unlisten will be ASAP! - }); - - // Workers in this test are only extra stuff - std::thread worker1{[&bus]() { - for(int i = 0; i < 10; ++i) - { - bus.postpone(SimpleEvent{std::this_thread::get_id()}); - std::this_thread::sleep_for(ns3); - } - }}; - // Workers in this test are only extra stuff - std::thread worker2{[&bus]() { - for(int i = 0; i < 10; ++i) - { - bus.postpone(SimpleEvent{std::this_thread::get_id()}); - std::this_thread::sleep_for(ns2); - } - }}; - - REQUIRE(counter == 0); - - for(int i = 0; i < 10; ++i) - { - bus.postpone(SimpleEvent{std::this_thread::get_id()}); - bus.postpone(SimpleEvent{std::this_thread::get_id()}); - bus.processAll(); - } - - REQUIRE(counter == 1); // Should be called only once - worker1.join(); - worker2.join(); -} diff --git a/test/src/EventCollectorTest.cpp b/test/src/EventCollectorTest.cpp deleted file mode 100644 index f0425de..0000000 --- a/test/src/EventCollectorTest.cpp +++ /dev/null @@ -1,77 +0,0 @@ -// -// Created by Dawid Drozd aka Gelldur on 05.08.17. -// - -#include -#include -#include - -using namespace dexode; -using TransactionEventBus = EventBus; -using Listener = TransactionEventBus::Listener; - -TEST_CASE("eventbus/EventCollector sample", "Simple test for EventCollector") -{ - struct SimpleEvent - { - int value; - }; - - TransactionEventBus bus; - - int callCount = 0; - { - auto listener = Listener::createNotOwning(bus); - listener.listen([&](const SimpleEvent& event) { - REQUIRE(event.value == 3); - ++callCount; - }); - bus.post(SimpleEvent{3}); - REQUIRE(callCount == 1); - } - bus.post(SimpleEvent{2}); - REQUIRE(callCount == 1); -} - -TEST_CASE("eventbus/EventCollector unlistenAll", "EventCollector::unlistenAll") -{ - struct SimpleEvent - { - int value; - }; - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - int callCount = 0; - listener.listen([&](const SimpleEvent& event) { - REQUIRE(event.value == 3); - ++callCount; - }); - bus.post(SimpleEvent{3}); - listener.unlistenAll(); - - bus.post(SimpleEvent{2}); - REQUIRE(callCount == 1); -} - -TEST_CASE("eventbus/EventCollector reset", "EventCollector reset when we reasign") -{ - struct SimpleEvent - { - int value; - }; - - TransactionEventBus bus; - int callCount = 0; - auto listener = Listener::createNotOwning(bus); - listener.listen([&](const SimpleEvent& event) { - REQUIRE(event.value == 3); - ++callCount; - }); - bus.post(SimpleEvent{3}); - REQUIRE(callCount == 1); - listener = Listener{}; - - bus.post(SimpleEvent{2}); - REQUIRE(callCount == 1); -} diff --git a/test/src/EventIdTest.cpp b/test/src/EventIdTest.cpp deleted file mode 100644 index 1173e32..0000000 --- a/test/src/EventIdTest.cpp +++ /dev/null @@ -1,52 +0,0 @@ -#include - -#include -#include - -using namespace dexode; - -namespace -{ -struct Anonymous -{}; -} // namespace - -struct TestA -{ - int a; -}; - -namespace Test -{ -struct TestA -{ - bool b; -}; - -namespace TestN -{ -struct TestA -{ - long long c; -}; - -} // namespace TestN - -} // namespace Test - -TEST_CASE("Should return unique id for each event When using Internal::event_id", - "[EventId]") -{ - std::set unique; - - REQUIRE(unique.insert(eventbus::internal::event_id()).second); - REQUIRE_FALSE(unique.insert(eventbus::internal::event_id()).second); // already there - - struct TestA - {}; - - REQUIRE(unique.insert(eventbus::internal::event_id()).second); - REQUIRE(unique.insert(eventbus::internal::event_id<::TestA>()).second); - REQUIRE(unique.insert(eventbus::internal::event_id()).second); - REQUIRE(unique.insert(eventbus::internal::event_id()).second); -} diff --git a/test/src/NotifierTest.cpp b/test/src/NotifierTest.cpp deleted file mode 100644 index d5e2a72..0000000 --- a/test/src/NotifierTest.cpp +++ /dev/null @@ -1,393 +0,0 @@ -// -// Created by Dawid Drozd aka Gelldur on 05.08.17. -// - -#define CATCH_CONFIG_MAIN -#include -#include -#include - -using namespace dexode; -using TransactionEventBus = EventBus; -using Listener = TransactionEventBus::Listener; - -TEST_CASE("eventbus/Simple test", "Simple test") -{ - TransactionEventBus bus; - struct SimpleEvent - { - int value; - }; - - auto listener = Listener::createNotOwning(bus); - listener.listen([](const SimpleEvent& event) { REQUIRE(event.value == 3); }); - - bus.post(SimpleEvent{3}); - listener.unlistenAll(); - bus.post(SimpleEvent{2}); - - listener.listen([](const SimpleEvent& event) { REQUIRE(event.value == 1); }); - bus.post(SimpleEvent{1}); -} - -TEST_CASE("eventbus/Simple test2", "Simple test") -{ - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - struct SimpleEvent - { - int value; - }; - - listener.listen([](const SimpleEvent& event) { REQUIRE(event.value == 3); }); - - bus.post({3}); - listener.unlistenAll(); - bus.post(SimpleEvent{2}); - - listener.listen([](const SimpleEvent& event) { REQUIRE(event.value == 1); }); - bus.post(SimpleEvent{1}); -} - -TEST_CASE("eventbus/EventBus listen & post", - "Listen & post without notification object. Using only string") -{ - int isCalled = 0; - - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - struct SimpleEvent - { - int value; - }; - - listener.listen([&](const SimpleEvent& event) { - ++isCalled; - REQUIRE(event.value == 3); - }); - REQUIRE(isCalled == 0); - bus.post(SimpleEvent{3}); - REQUIRE(isCalled == 1); - listener.unlistenAll(); - bus.post(SimpleEvent{2}); - REQUIRE(isCalled == 1); - - listener.listen([&](const SimpleEvent& event) { - ++isCalled; - REQUIRE(event.value == 1); - }); - bus.post(SimpleEvent{1}); - REQUIRE(isCalled == 2); -} - -TEST_CASE("eventbus/Different notification", "Valid check notification") -{ - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - struct SimpleEvent1 - { - int value; - }; - struct SimpleEvent2 - { - int value; - }; - - bool called1 = false; - bool called2 = false; - - listener.listen([&called1](const SimpleEvent1& event) { - called1 = true; - REQUIRE(event.value == 1); - }); - - listener.listen([&called2](const SimpleEvent2& event) { - called2 = true; - REQUIRE(event.value == 2); - }); - - REQUIRE(called1 == false); - - bus.post(SimpleEvent1{1}); - - REQUIRE(called1 == true); - REQUIRE(called2 == false); - called1 = false; - - bus.post(SimpleEvent2{2}); - - REQUIRE(called1 == false); - REQUIRE(called2 == true); -} - -namespace Scope1 -{ -struct SimpleEvent -{ - int value; -}; -} // namespace Scope1 -namespace Scope2 -{ -struct SimpleEvent -{ - int value; -}; -} // namespace Scope2 - -TEST_CASE("eventbus/EventBus different events", - "Events with the same name but different scope should be different") -{ - int isCalled = 0; - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - listener.listen([&](const Scope1::SimpleEvent& event) { - ++isCalled; - REQUIRE(event.value == 1); - }); - listener.listen([&](const Scope2::SimpleEvent& event) { - ++isCalled; - REQUIRE(event.value == 2); - }); - REQUIRE(isCalled == 0); - - bus.post(Scope1::SimpleEvent{1}); - REQUIRE(isCalled == 1); - - bus.post(Scope2::SimpleEvent{2}); - REQUIRE(isCalled == 2); -} - -TEST_CASE("eventbus/EventBus modification during post", - "Remove listener during notification should not malform EventBus") -{ - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - struct TestEvent - {}; - - // int token1 = 1; - // int token2 = 2; - - int calls = 0; - - // listener.listen(token1, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // }); - // listener.listen(token2, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // }); - - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 2); - - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 2); -} - -TEST_CASE("eventbus/EventBus modification during post2", - "Remove listener during notification should not malform EventBus") -{ - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - struct TestEvent - {}; - - // int token1 = 1; - // int token2 = 2; - // int token3 = 3; - - int calls = 0; - - // bus.listen(token1, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // bus.unlistenAll(token3); - // }); - // bus.listen(token2, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // bus.unlistenAll(token3); - // }); - // bus.listen(token3, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // bus.unlistenAll(token3); - // }); - - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 3); - - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 3); -} - -TEST_CASE("eventbus/EventBus modification during post3", - "Remove listener during notification should not malform EventBus") -{ - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - struct TestEvent - {}; - - // int token1 = 1; - // int token2 = 2; - // int token3 = 3; - - int calls = 0; - - // bus.listen(token1, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // }); - // bus.listen(token2, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token3); - // bus.unlistenAll(token2); - // }); - // bus.listen(token3, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // bus.unlistenAll(token3); - // }); - - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 3); -} - -TEST_CASE("eventbus/EventBus modification during post4", - "Remove listener during notification should not malform EventBus") -{ - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - struct TestEvent - {}; - - // int token1 = 1; - // int token2 = 2; - // int token3 = 3; - - int calls = 0; - - // bus.listen(token1, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // - // bus.listen(token2, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token3); - // bus.unlistenAll(token2); - // }); - // }); - // bus.listen(token3, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // bus.unlistenAll(token3); - // }); - - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 2); - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 2); -} - -TEST_CASE("eventbus/EventBus modification during post5", - "Remove listener during notification should not malform EventBus") -{ - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - struct TestEvent - {}; - - // int token1 = 1; - // int token2 = 2; - // int token3 = 3; - - int calls = 0; - - // bus.listen(token1, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // }); - // bus.listen(token2, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // - // bus.listen(token3, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // bus.unlistenAll(token3); - // }); - // }); - - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 2); - - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 3); -} - -TEST_CASE("eventbus/EventBus modification during nested post", - "Remove listener during notification should not malform EventBus") -{ - TransactionEventBus bus; - auto listener = Listener::createNotOwning(bus); - - struct TestEvent - {}; - struct TestEvent2 - {}; - - // int token1 = 1; - // int token2 = 2; - // int token3 = 3; - - int calls = 0; - - // bus.listen(token1, [&](const TestEvent& event) { - // bus.post(TestEvent2{}); - // - // ++calls; - // bus.unlistenAll(token1); - // - // bus.listen(token2, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token3); - // bus.unlistenAll(token2); - // }); - // }); - // bus.listen(token3, [&](const TestEvent& event) { - // ++calls; - // bus.unlistenAll(token1); - // bus.unlistenAll(token2); - // bus.unlistenAll(token3); - // }); - - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 2); - REQUIRE_NOTHROW(bus.post(TestEvent{})); - REQUIRE(calls == 2); -} diff --git a/test/src/TestTagEventBus.cpp b/test/src/TestTagEventBus.cpp deleted file mode 100644 index 0b63c9b..0000000 --- a/test/src/TestTagEventBus.cpp +++ /dev/null @@ -1,67 +0,0 @@ -// -// Created by gelldur on 30.10.2019. -// -#include - -#include - -#include "dexode/eventbus/TagEventBus.hpp" -#include "dexode/eventbus/strategy/Protected.hpp" - -using TagEventBus = dexode::eventbus::TagEventBus; - -namespace -{ - -struct EventWithMessage -{ - std::string message = "no-msg"; -}; - -} // namespace - -TEST_CASE("Should notify only listeners with specific tag When using TagEventBus", "[TagEventBus]") -{ - TagEventBus bus{{"gui", "backend"}}; - auto listenerGlobal = TagEventBus::ListenerAll::createNotOwning(bus); - - int counterGlobalListener = 0; - int counterTagGUIListener = 0; - int counterTagBackendListener = 0; - - listenerGlobal.listen([&](const EventWithMessage& event) { - INFO("[Global listener] Received: EventWithMessage:" << event.message); - ++counterGlobalListener; - }); - - TagEventBus::Listener guiListener{bus.get("gui")}; - guiListener.listen([&](const EventWithMessage& event) { - INFO("[GUI listener] Received: EventWithMessage:" << event.message); - ++counterTagGUIListener; - }); - - TagEventBus::Listener backendListener{bus.get("backend")}; - backendListener.listen([&](const EventWithMessage& event) { - INFO("[Backend listener] Received: EventWithMessage:" << event.message); - ++counterTagBackendListener; - }); - - { - bus.post(EventWithMessage{"everyone should get this message (global listeners included)"}); - REQUIRE(counterGlobalListener == 1); - REQUIRE(counterTagGUIListener == 1); - REQUIRE(counterTagBackendListener == 1); - } - { - bus.post("gui", EventWithMessage{"gui + global should get this message"}); - REQUIRE(counterGlobalListener == 2); - REQUIRE(counterTagGUIListener == 2); - REQUIRE(counterTagBackendListener == 1); - } - { - bus.post("backend", EventWithMessage{"backend + global should get this message"}); - REQUIRE(counterGlobalListener == 3); - REQUIRE(counterTagGUIListener == 2); - REQUIRE(counterTagBackendListener == 2); - } -} diff --git a/test/src/dexode/eventbus/test/SuiteConcurrentEventBus.cpp b/test/src/dexode/eventbus/test/SuiteConcurrentEventBus.cpp new file mode 100644 index 0000000..1e27c0c --- /dev/null +++ b/test/src/dexode/eventbus/test/SuiteConcurrentEventBus.cpp @@ -0,0 +1,180 @@ +#include +#include +#include +#include +#include + +#include + +#include "dexode/EventBus.hpp" +#include "dexode/eventbus/perk/PerkEventBus.hpp" +#include "dexode/eventbus/perk/WaitPerk.hpp" +#include "dexode/eventbus/test/event.hpp" + +using namespace std::chrono_literals; + +namespace dexode::eventbus::test +{ +using Listener = dexode::EventBus::Listener; + +struct SimpleEvent +{ + std::thread::id id; +}; + +constexpr auto ns2 = std::chrono::nanoseconds{2}; // GCC 7 has some issues with 2ms :/ +constexpr auto ns3 = std::chrono::nanoseconds{3}; + +TEST_CASE("Should consume events in synchronous way When using worker threads", + "[concurrent][EventBus]") +{ + EventBus bus; + auto listener = Listener::createNotOwning(bus); + + std::atomic counter = 0; + + listener.listen([&counter](const SimpleEvent& event) { + // std::cout << "Event from: " << event.id << std::endl; + ++counter; + }); + + std::thread worker1{[&bus]() { + for(int i = 0; i < 10; ++i) + { + bus.postpone(SimpleEvent{std::this_thread::get_id()}); + std::this_thread::sleep_for(ns3); + } + }}; + std::thread worker2{[&bus]() { + for(int i = 0; i < 10; ++i) + { + bus.postpone(SimpleEvent{std::this_thread::get_id()}); + std::this_thread::sleep_for(ns2); + } + }}; + + REQUIRE(counter == 0); + int sumOfConsumed = 0; + while(counter < 20) + { + REQUIRE(counter == sumOfConsumed); + sumOfConsumed += bus.process(); + REQUIRE(counter == sumOfConsumed); + } + + worker1.join(); + worker2.join(); +} + +TEST_CASE("Should listen for only 1 event When call unlisten inside Listener", + "[concurrent][EventBus]") +{ + EventBus bus; + auto listener = Listener::createNotOwning(bus); + + std::atomic counter = 0; + + listener.listen([&counter, &listener](const SimpleEvent& event) { + // std::cout << "Event from: " << event.id << std::endl; + ++counter; + listener.unlistenAll(); + }); + + std::thread worker1{[&bus]() { + for(int i = 0; i < 10; ++i) + { + bus.postpone(SimpleEvent{std::this_thread::get_id()}); + std::this_thread::sleep_for(ns3); + } + }}; + std::thread worker2{[&bus]() { + for(int i = 0; i < 10; ++i) + { + bus.postpone(SimpleEvent{std::this_thread::get_id()}); + std::this_thread::sleep_for(ns2); + } + }}; + + std::thread worker3{[&bus, &counter]() { + while(counter == 0) + { + bus.process(); + std::this_thread::sleep_for(ns2); + } + for(int i = 0; i < 10; ++i) + { + bus.process(); + std::this_thread::sleep_for(ns2); + } + }}; + + for(int i = 0; i < 10; ++i) + { + bus.postpone(SimpleEvent{std::this_thread::get_id()}); + } + + worker1.join(); + worker2.join(); + worker3.join(); + + REQUIRE(counter == 1); // Should be called only once +} + +TEST_CASE("Should wait work", "[concurrent][EventBus]") +{ + dexode::eventbus::perk::PerkEventBus bus; + bus.addPerk(std::make_unique()) + .registerPostPostpone(&dexode::eventbus::perk::WaitPerk::onPostponeEvent); + + auto* waitPerk = bus.getPerk(); + REQUIRE(waitPerk != nullptr); + + auto listener = Listener::createNotOwning(bus); + listener.listen( + [](const event::WaitPerk& event) { std::cout << "In WaitPerk event" << std::endl; }); + listener.listen([](const event::T1& event) { std::cout << "In T1 event" << std::endl; }); + + // Worker which will send event every 10 ms + std::atomic isWorking = true; + std::atomic produced{0}; + std::atomic consumed{0}; + + std::thread producer{[&bus, &isWorking, &produced]() { + while(isWorking) + { + bus.postpone(event::WaitPerk{}); + bus.postpone(event::T1{}); + ++produced; + ++produced; + std::this_thread::sleep_for(5ms); + } + }}; + + for(int i = 0; i < 20; ++i) + { + auto start = std::chrono::steady_clock::now(); + if(waitPerk->waitFor(20ms)) + { + int beforeConsumed = consumed; + consumed += bus.process(); + INFO("If events available then consumed count should change") + CHECK(consumed > beforeConsumed); + } + else + { + // No events waiting for us + } + + std::cout << "I was sleeping for: " + << std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count() + << " ms, consumed:" << consumed << std::endl; + } + + isWorking = false; + producer.join(); + REQUIRE(produced >= consumed); +} + +} // namespace dexode::eventbus::test diff --git a/test/src/dexode/eventbus/test/SuiteEventBus.cpp b/test/src/dexode/eventbus/test/SuiteEventBus.cpp new file mode 100644 index 0000000..361c723 --- /dev/null +++ b/test/src/dexode/eventbus/test/SuiteEventBus.cpp @@ -0,0 +1,424 @@ +#include + +#include + +#include "dexode/EventBus.hpp" +#include "dexode/eventbus/test/event.hpp" + +namespace dexode::eventbus::test +{ + +// class TestProducer +//{ +// public: +// using Events = std::variant; +// +// void onUpdate(TransactionEventBus& bus) +// { +// bus.postpone(event::T1{}); +// } +// +// private: +//}; + +// TEST_CASE("Should process events independently When postponed multiple event types", +// "[EventBus]") +//{ +// EventBus bus; +// +// int event1CallCount = 0; +// int event2CallCount = 0; +// +// auto listener = Listener::createNotOwning(bus); +// listener.listen([&](const event::T1& event) { ++event1CallCount; }); +// listener.listen2([&](const event::T2& event) { ++event2CallCount; }); +// +// bus.postpone(event::T1{}); +// { +// event::T2 localVariable; +// bus.postpone(localVariable); +// } +// +// REQUIRE(event1CallCount == 0); +// REQUIRE(event2CallCount == 0); +// REQUIRE(bus.process() == 1); +// REQUIRE(event1CallCount == 1); +// REQUIRE(event2CallCount == 0); +// +// REQUIRE(bus.process() == 1); +// REQUIRE(event1CallCount == 1); +// REQUIRE(event2CallCount == 1); +//} + +TEST_CASE("Should deliver event with desired value When postpone event", "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + + int callCount = 0; + + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 3); + ++callCount; + }); + + REQUIRE(callCount == 0); + bus.postpone(event::Value{3}); + REQUIRE(callCount == 0); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 1); +} + +TEST_CASE("Should be able to replace listener When we do listen -> unlisten -> listen", + "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + + int callCount = 0; + + { + // TODO validate other signatures is it possible to make mistake? (maybe fail to build tests + // ?) + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 3); + ++callCount; + }); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 1); + } + + { + listener.unlisten(); + bus.postpone(event::Value{2}); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 1); + } + + { + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 1); + ++callCount; + }); + + bus.postpone(event::Value{1}); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 2); + } +} + +TEST_CASE("Should be able to replace listener When we do listen -> unlistenAll -> listen", + "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + + int callCount = 0; + { + // TODO validate other signatures is it possible to make mistake? (maybe fail to build tests + // ?) + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 3); + ++callCount; + }); + listener.listen([&](const event::T1& event) { ++callCount; }); + bus.postpone(event::Value{3}); + bus.postpone(event::T1{}); + REQUIRE(bus.process() == 2); + REQUIRE(callCount == 2); + } + + { + listener.unlistenAll(); + bus.postpone(event::Value{3}); + bus.postpone(event::T1{}); + REQUIRE(bus.process() == 2); + REQUIRE(callCount == 2); // no new calls + } + + { + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 1); + ++callCount; + }); + + bus.postpone(event::Value{1}); + bus.postpone(event::T1{}); + REQUIRE(bus.process() == 2); + REQUIRE(callCount == 3); + } +} + +TEST_CASE("Should flush events When no one listens", "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 3); // it shouldn't accumulate events +} + +TEST_CASE("Should be able to unlisten When processing event", "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + + int callCount = 0; + listener.listen([&](const event::Value& event) { + ++callCount; + listener.unlisten(); + }); + + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 3); // it shouldn't accumulate events + REQUIRE(callCount == 1); +} + +TEST_CASE("Should not fail When unlisten multiple times during processing event", "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + + int callCount = 0; + listener.listen([&](const event::Value& event) { + REQUIRE_NOTHROW(listener.unlisten()); + REQUIRE_NOTHROW(listener.unlisten()); + REQUIRE_NOTHROW(listener.unlisten()); + ++callCount; + }); + + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 3); // it shouldn't accumulate events + REQUIRE(callCount == 1); +} + +TEST_CASE("Should fail When try to add not unique listener", "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + + REQUIRE_NOTHROW(listener.listen([](const event::T1&) {})); + REQUIRE_THROWS(listener.listen([](const event::T1&) {})); // We already added listener +} + +TEST_CASE("Should be able to add listener When processing event", "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + auto listenerOther = EventBus::Listener::createNotOwning(bus); + + int callCount = 0; + int callCountOther = 0; + listener.listen([&](const event::Value& event) { + ++callCount; + if(callCount == 1) // remember that we can only add it once! + { + listenerOther.listen( + [&callCountOther](const event::Value& event) { ++callCountOther; }); + } + }); + + { + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 1); + REQUIRE(callCountOther == 0); + + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 2); + REQUIRE(callCountOther == 1); + } + { + listenerOther.unlistenAll(); + callCount = 0; + callCountOther = 0; + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 3); // it shouldn't accumulate events + REQUIRE(callCount == 3); + REQUIRE(callCountOther == 2); + } +} + +TEST_CASE("Should be able to add listener and unlisten When processing event", "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + auto listenerOther = EventBus::Listener::createNotOwning(bus); + + int callCount = 0; + int callCountOther = 0; + listener.listen([&](const event::Value& event) { + ++callCount; + if(callCount == 1) // remember that we can only add it once! + { + listenerOther.listen([&](const event::Value& event) { ++callCountOther; }); + } + listener.unlistenAll(); + }); + + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 3); + REQUIRE(callCount == 1); + REQUIRE(callCountOther == 2); +} + +TEST_CASE( + "Should be able to add listener and remove listener in Matryoshka style When processing event", + "[EventBus]") +{ + EventBus bus; + auto listener1 = EventBus::Listener::createNotOwning(bus); + auto listener2 = EventBus::Listener::createNotOwning(bus); + auto listener3 = EventBus::Listener::createNotOwning(bus); + + int callCount = 0; + + listener1.listen([&](const event::Value& event) { + listener1.unlistenAll(); // Level 1 + + listener2.listen([&](const event::Value& event) { + listener2.unlistenAll(); // Level 2 + + listener3.listen([&](const event::Value& event) { + listener3.unlistenAll(); // Level 3 (final) + ++callCount; + }); + ++callCount; + }); + ++callCount; + }); + + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 4); + REQUIRE(callCount == 3); +} + +TEST_CASE("Should be chain listen and unlisten When processing event", "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + auto listenerOther = EventBus::Listener::createNotOwning(bus); + + int callCount = 0; + int callOtherOption = 0; + listener.listen([&](const event::Value& event) { + ++callCount; + // remember that we can only add it once! + listenerOther.listen([&](const event::Value& event) { callOtherOption = 1; }); + listenerOther.unlisten(); + listenerOther.listen([&](const event::Value& event) { callOtherOption = 2; }); + listenerOther.unlisten(); + listenerOther.listen([&](const event::Value& event) { callOtherOption = 3; }); + listenerOther.unlisten(); + + listenerOther.listen([&](const event::Value& event) { callOtherOption = 4; }); + + listener.unlistenAll(); + }); + + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 3); + REQUIRE(callCount == 1); + REQUIRE(callOtherOption == 4); +} + +TEST_CASE("Should not process events When no more events", "[EventBus]") +{ + EventBus bus; + auto listener = EventBus::Listener::createNotOwning(bus); + + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 3); // it shouldn't accumulate events + REQUIRE(bus.process() == 0); +} + +TEST_CASE("Should distinguish event producer When", "[EventBus]") +{ + // EventBus bus; + // + // // dexode::eventbus::WrapTag e; + // + // int counterGui = 0; + // int counterBackend = 0; + // auto listener = EventBus::Listener::createNotOwning(bus); + // + // listener.listen([&](const event::Tag& event) { + // if(event.tag == "gui") + // { + // ++counterGui; + // } + // else if(event.tag == "backend") + // { + // ++counterBackend; + // } + // else + // { + // FAIL(); + // } + // }); + // + // auto producerGui = [tag = Tag::gui](EventBus& bus) { + // const event::T1 event; + // bus.postpone(event); + // }; + // + // auto producerBackend = [](EventBus& bus) { + // const event::T1 event; + // bus.postpone(Tag::backend, event); + // }; + // + // auto producerNoTag = [](EventBus& bus) { + // const event::T1 event; + // bus.postpone(event); + // }; + // + // auto producerGeneric = [](EventBus& bus, Tag tag) { + // const event::T1 event; + // if(tag == Tag::none) + // { + // bus.postpone(event); + // } + // else + // { + // bus.postpone(tag, event); + // } + // }; + // + // producerGui(bus); + // producerGui(bus); + // producerBackend(bus); + // producerGui(bus); + // producerNoTag(bus); + // producerGeneric(bus, Tag::none); + // producerGeneric(bus, Tag::backend); + // + // CHECK(bus.process() == 7); + // + // REQUIRE(counterGui == 3); + // REQUIRE(counterBackend == 2); +} + +} // namespace dexode::eventbus::test + +// TODO should listen work with bind'ing e.g. +//_listener.listen( +// std::bind(&BackDashboard::onLoadOrders, this, std::placeholders::_1)); diff --git a/test/src/dexode/eventbus/test/SuiteEventID.cpp b/test/src/dexode/eventbus/test/SuiteEventID.cpp new file mode 100644 index 0000000..e11afc9 --- /dev/null +++ b/test/src/dexode/eventbus/test/SuiteEventID.cpp @@ -0,0 +1,57 @@ +#include + +#include + +#include "dexode/eventbus/internal/event_id.hpp" + +using namespace dexode; + +namespace +{ +struct Anonymous +{}; +} // namespace + +struct TestA +{ + int a; +}; + +namespace Test +{ +struct TestA +{ + bool b; +}; + +namespace TestN +{ +struct TestA +{ + long long c; +}; + +} // namespace TestN + +} // namespace Test + +namespace dexode::eventbus::test +{ + +TEST_CASE("Should return unique id for each event When using event_id", "[EventID]") +{ + std::set unique; + + REQUIRE(unique.insert(internal::event_id()).second); + REQUIRE_FALSE(unique.insert(internal::event_id()).second); // already there + + struct TestA // "name collision" but not quite collision + {}; + + REQUIRE(unique.insert(internal::event_id()).second); + REQUIRE(unique.insert(internal::event_id<::TestA>()).second); + REQUIRE(unique.insert(internal::event_id()).second); + REQUIRE(unique.insert(internal::event_id()).second); +} + +} // namespace dexode::eventbus::test diff --git a/test/src/dexode/eventbus/test/SuiteListener.cpp b/test/src/dexode/eventbus/test/SuiteListener.cpp new file mode 100644 index 0000000..973ba17 --- /dev/null +++ b/test/src/dexode/eventbus/test/SuiteListener.cpp @@ -0,0 +1,234 @@ +#include + +#include "dexode/EventBus.hpp" +#include "dexode/eventbus/test/event.hpp" + +namespace dexode::eventbus::test +{ + +using Listener = dexode::EventBus::Listener; + +TEST_CASE("Should remove all listeners When use unlistenAll", "[EventBus][Listener]") +{ + EventBus bus; + auto listener = Listener::createNotOwning(bus); + + int callCount = 0; + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 3); + ++callCount; + }); + listener.listen([&](const event::T1& event) { ++callCount; }); + + bus.postpone(event::Value{3}); + bus.postpone(event::T1{}); + REQUIRE(bus.process() == 2); + REQUIRE(callCount == 2); + + listener.unlistenAll(); + + bus.postpone(event::Value{2}); + bus.postpone(event::T1{}); + REQUIRE(bus.process() == 2); + REQUIRE(callCount == 2); // unchanged +} + +TEST_CASE("Should unlisten all events When listener instance is overriden", "[EventBus][Listener]") +{ + EventBus bus; + int callCount = 0; + auto listener = Listener::createNotOwning(bus); + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 3); + ++callCount; + }); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 1); + + listener = Listener{}; + + bus.postpone(event::Value{2}); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 1); +} + +TEST_CASE("Should unlisten all events When listener instance is destroyed", "[EventBus][Listener]") +{ + EventBus bus; + int callCount = 0; + + { + auto listener = Listener::createNotOwning(bus); + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 3); + ++callCount; + }); + bus.postpone(event::Value{3}); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 1); + } + + bus.postpone(event::Value{2}); + REQUIRE(bus.process() == 1); + REQUIRE(callCount == 1); +} + +TEST_CASE("Should keep listeners When listener is moved", "[EventBus][Listener]") +{ + auto bus = std::make_shared(); + int callCount = 0; + + Listener transferOne; + { + Listener listener{bus}; + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 3); + ++callCount; + }); + bus->postpone(event::Value{3}); + REQUIRE(bus->process() == 1); + REQUIRE(callCount == 1); + + transferOne = std::move(listener); + } + + bus->postpone(event::Value{3}); + REQUIRE(bus->process() == 1); + REQUIRE(callCount == 2); +} + +TEST_CASE("Should receive event When listener added AFTER event emit but BEFORE event porcess", + "[EventBus][Listener]") +{ + auto bus = std::make_shared(); + int callCount = 0; + bus->postpone(event::Value{22}); + + Listener listener{bus}; + listener.listen([&](const event::Value& event) { + REQUIRE(event.value == 22); + ++callCount; + }); + + REQUIRE(callCount == 0); + bus->process(); + REQUIRE(callCount == 1); +} + +TEST_CASE("Should bind listen to class method and unbind When clazz dtor is called", + "[EventBus][Listener]") +{ + auto bus = std::make_shared(); + struct TestBind + { + int callCount = 0; + Listener listener; + + TestBind(const std::shared_ptr& bus) + : listener{bus} + { + listener.listen(std::bind(&TestBind::onEvent, this, std::placeholders::_1)); + } + + void onEvent(const event::T1& event) + { + ++callCount; + } + }; + + bus->postpone(event::T1{}); + { + TestBind bindClazz{bus}; + REQUIRE(bindClazz.callCount == 0); + CHECK(bus->process() == 1); + REQUIRE(bindClazz.callCount == 1); + } + bus->postpone(event::T1{}); + CHECK(bus->process() == 1); +} + +static int globalCallCount{0}; // bad practice but want to just test +void freeFunction(const event::T1& event) +{ + ++globalCallCount; +} + +TEST_CASE("Should compile When listen in different forms", "[EventBus][Listener]") +{ + EventBus bus; + + int callCount = 0; + + // Listen with lambda + auto listener = Listener::createNotOwning(bus); + listener.listen([&](const event::T1& event) { ++callCount; }); + + // Listen with std::function + auto listener2 = Listener::createNotOwning(bus); + std::function callback = [&](const event::T1&) { ++callCount; }; + listener2.listen(callback); + + // Listen with std::bind + auto listener3 = Listener::createNotOwning(bus); + struct TestClazz + { + int clazzCallCount{0}; + void onEvent(const event::T1& event) + { + ++clazzCallCount; + } + }; + TestClazz clazz; + listener3.listen(std::bind(&TestClazz::onEvent, &clazz, std::placeholders::_1)); + + // Listen with free function + auto listener4 = Listener::createNotOwning(bus); + listener4.listen(freeFunction); + + bus.postpone(event::T1{}); + bus.process(); + + REQUIRE(globalCallCount == 1); + REQUIRE(clazz.clazzCallCount == 1); + REQUIRE(callCount == 2); +} + +TEST_CASE("Should NOT be able to add multiple same event callbacks When using same listener", + "[EventBus][Listener]") +{ + /// User should use separate Listener instance as it would be unabigious what should happen when + /// call unlisten + EventBus bus; + auto listener = Listener::createNotOwning(bus); + + listener.listen([](const event::Value& event) {}); + REQUIRE_THROWS(listener.listen([](const event::Value& event) {})); +} + +TEST_CASE("Should compile", "[EventBus][Listener]") +{ + // Test case to check for compilation + EventBus bus; + { + auto listener = Listener::createNotOwning(bus); + const auto callback = [](const event::Value& event) {}; + listener.listen(callback); + } + { + auto listener = Listener::createNotOwning(bus); + auto callback = [](const event::Value& event) {}; + listener.listen(callback); + } + { + auto listener = Listener::createNotOwning(bus); + auto callback = [](const event::Value& event) {}; + listener.listen(std::move(callback)); + } + { + auto listener = Listener::createNotOwning(bus); + listener.listen([](const event::Value& event) {}); + } +} + +} // namespace dexode::eventbus::test diff --git a/test/src/dexode/eventbus/test/event.hpp b/test/src/dexode/eventbus/test/event.hpp new file mode 100644 index 0000000..2227220 --- /dev/null +++ b/test/src/dexode/eventbus/test/event.hpp @@ -0,0 +1,32 @@ +// +// Created by gelldur on 24.11.2019. +// +#pragma once + +namespace dexode::eventbus::test::event +{ + +struct T1 +{}; + +struct T2 +{}; + +struct Value +{ + int value{-1}; +}; + +template +struct Tag +{ + Event data; + std::string tag; +}; + +struct WaitPerk +{ + +}; + +} // namespace dexode::eventbus::test::event diff --git a/test/src/main.cpp b/test/src/main.cpp new file mode 100644 index 0000000..4ed06df --- /dev/null +++ b/test/src/main.cpp @@ -0,0 +1,2 @@ +#define CATCH_CONFIG_MAIN +#include diff --git a/use_case/CMakeLists.txt b/use_case/CMakeLists.txt new file mode 100644 index 0000000..2aed4d1 --- /dev/null +++ b/use_case/CMakeLists.txt @@ -0,0 +1,3 @@ +# +add_subdirectory(basic/) +add_subdirectory(tagged_events/) diff --git a/use_case/README.md b/use_case/README.md new file mode 100644 index 0000000..f1b90ae --- /dev/null +++ b/use_case/README.md @@ -0,0 +1,5 @@ +# Use cases + +This folder should contain use cases and approaches how to use EventBus. + +Most important need for this is only to store needed use cases by other projects. diff --git a/use_case/basic/CMakeLists.txt b/use_case/basic/CMakeLists.txt new file mode 100644 index 0000000..8a34d49 --- /dev/null +++ b/use_case/basic/CMakeLists.txt @@ -0,0 +1,6 @@ +# +add_executable(UseCase_Basic + src/main.cpp + ) + +target_link_libraries(UseCase_Basic PRIVATE Dexode::EventBus) diff --git a/use_case/basic/README.md b/use_case/basic/README.md new file mode 100644 index 0000000..0e8933a --- /dev/null +++ b/use_case/basic/README.md @@ -0,0 +1,3 @@ +# Use case: Basic + +Just basic use case of EventBus. diff --git a/sample/src/main.cpp b/use_case/basic/src/main.cpp similarity index 57% rename from sample/src/main.cpp rename to use_case/basic/src/main.cpp index 713d17d..490168b 100644 --- a/sample/src/main.cpp +++ b/use_case/basic/src/main.cpp @@ -7,16 +7,18 @@ #include #include -#include -#include +#include -namespace Event // Example namespace for events +using EventBus = dexode::EventBus; +using Listener = dexode::EventBus::Listener; + +namespace event // Example namespace for events { struct Gold // Event that will be proceed when our gold changes { int value = 0; }; -} // namespace Event +} // namespace event enum class Monster { @@ -28,8 +30,8 @@ enum class Monster class Character { public: - Character(const std::shared_ptr& eventBus) - : _bus {eventBus} + Character(const std::shared_ptr& eventBus) + : _bus{eventBus} {} void kill(Monster monsterType) @@ -46,19 +48,19 @@ public: { _gold += 25; } - _bus->notify(Event::Gold {_gold}); + _bus->postpone(event::Gold{_gold}); } private: int _gold = 0; - std::shared_ptr _bus; + std::shared_ptr _bus; }; class UIWallet { public: - UIWallet(const std::shared_ptr& eventBus) - : _listener {eventBus} + UIWallet(const std::shared_ptr& eventBus) + : _listener{eventBus} {} void onDraw() // Example "draw" of UI @@ -67,9 +69,9 @@ public: } void onEnter() // We could also do such things in ctor and dtor but a lot of UI has something - // like this + // like this { - _listener.listen( + _listener.listen( [this](const auto& event) { _gold = std::to_string(event.value); }); } @@ -80,18 +82,18 @@ public: private: std::string _gold = "0"; - Dexode::EventCollector _listener; + Listener _listener; }; -class ShopButton // Shop button is only enabled when we have some gold (odd decision but for sample - // good :P) +// Shop button is only enabled when we have some gold (odd decision but for sample good :P) +class ShopButton { public: - ShopButton(const std::shared_ptr& eventBus) - : _listener {eventBus} + ShopButton(const std::shared_ptr& eventBus) + : _listener{eventBus} { // We can use lambda or bind your choice - _listener.listen( + _listener.listen( std::bind(&ShopButton::onGoldUpdated, this, std::placeholders::_1)); // Also we use RAII idiom to handle unlisten } @@ -102,10 +104,10 @@ public: } private: - Dexode::EventCollector _listener; + Listener _listener; bool _isEnabled = false; - void onGoldUpdated(const Event::Gold& event) + void onGoldUpdated(const event::Gold& event) { _isEnabled = event.value > 0; std::cout << "Shop button is:" << _isEnabled << std::endl; // some kind of logs @@ -114,13 +116,13 @@ private: int main(int argc, char* argv[]) { - std::shared_ptr eventBus = std::make_shared(); + auto eventBus = std::make_shared(); - Character characterController {eventBus}; + Character characterController{eventBus}; - UIWallet wallet {eventBus}; // UIWallet doesn't know anything about character - // or even who store gold - ShopButton shopButton {eventBus}; // ShopButton doesn't know anything about character + UIWallet wallet{eventBus}; // UIWallet doesn't know anything about character + // or even who store gold + ShopButton shopButton{eventBus}; // ShopButton doesn't know anything about character { wallet.onEnter(); } @@ -128,13 +130,17 @@ int main(int argc, char* argv[]) wallet.onDraw(); { characterController.kill(Monster::Tux); + eventBus->process(); } wallet.onDraw(); // It is easy to test UI eg. - eventBus->notify(Event::Gold {1}); + eventBus->postpone(event::Gold{1}); + eventBus->process(); assert(shopButton.isEnabled() == true); - eventBus->notify(Event::Gold {0}); + + eventBus->postpone(event::Gold{0}); + eventBus->process(); assert(shopButton.isEnabled() == false); wallet.onExit(); diff --git a/use_case/tagged_events/CMakeLists.txt b/use_case/tagged_events/CMakeLists.txt new file mode 100644 index 0000000..e84b07e --- /dev/null +++ b/use_case/tagged_events/CMakeLists.txt @@ -0,0 +1,11 @@ +# +add_executable(UseCase_TaggedEvents + src/Character.cpp src/Character.hpp + src/event.hpp + src/EventBus.hpp + src/main.cpp + src/Team.cpp src/Team.hpp + src/Gui.cpp src/Gui.hpp) + +target_include_directories(UseCase_TaggedEvents PUBLIC src/) +target_link_libraries(UseCase_TaggedEvents PRIVATE Dexode::EventBus) diff --git a/use_case/tagged_events/README.md b/use_case/tagged_events/README.md new file mode 100644 index 0000000..3d50ba4 --- /dev/null +++ b/use_case/tagged_events/README.md @@ -0,0 +1,14 @@ +# Use case: Tagged events + +## Motivation +Sometimes we have event producer but at some point we would like +to reuse event producer for different configuration. In some cases we +could just add some "tag" to event to resolve this issue. + +I would like to resolve this in other way as maybe data producer can't +be aware of its "tag". Other issue is spreading "tag" requirement over +whole project and maybe we don't want to modify old code. + +It would be nice to just add some abstraction layer which will resolve +this issue. + diff --git a/use_case/tagged_events/src/Character.cpp b/use_case/tagged_events/src/Character.cpp new file mode 100644 index 0000000..09594af --- /dev/null +++ b/use_case/tagged_events/src/Character.cpp @@ -0,0 +1,25 @@ +// +// Created by gelldur on 22.12.2019. +// +#include "Character.hpp" + +#include "event.hpp" + +Character::Character(std::shared_ptr bus) + : _bus{std::move(bus)} +{ + (void)_iq; // probably something should use it ;) +} + +void Character::pickGold(int goldCount) +{ + // We could store character name as member. Sure this isn't complex example rather simple one. + // Imagine few levels of composition ;) + _sackGold += goldCount; + _bus->postpone(event::GoldUpdate{_sackGold}); +} + +void Character::damage(int amount) +{ + _health -= amount; +} diff --git a/use_case/tagged_events/src/Character.hpp b/use_case/tagged_events/src/Character.hpp new file mode 100644 index 0000000..3038ab5 --- /dev/null +++ b/use_case/tagged_events/src/Character.hpp @@ -0,0 +1,23 @@ +// +// Created by gelldur on 22.12.2019. +// +#pragma once + +#include +#include + +#include "EventBus.hpp" + +class Character +{ +public: + Character(std::shared_ptr bus); + void pickGold(int goldCount); + void damage(int amount); + +private: + std::shared_ptr _bus; + int _sackGold = 0; + int _health = 100; + int _iq = 200; +}; diff --git a/use_case/tagged_events/src/EventBus.hpp b/use_case/tagged_events/src/EventBus.hpp new file mode 100644 index 0000000..b5a9091 --- /dev/null +++ b/use_case/tagged_events/src/EventBus.hpp @@ -0,0 +1,9 @@ +// +// Created by gelldur on 22.12.2019. +// +#pragma once + +#include + +using EventBus = dexode::EventBus; +using Listener = dexode::EventBus::Listener; diff --git a/use_case/tagged_events/src/Gui.cpp b/use_case/tagged_events/src/Gui.cpp new file mode 100644 index 0000000..6d583ac --- /dev/null +++ b/use_case/tagged_events/src/Gui.cpp @@ -0,0 +1,33 @@ +// +// Created by gelldur on 22.12.2019. +// +#include "Gui.hpp" + +#include + +#include "event.hpp" + +Gui::Gui(const std::shared_ptr& bus) + : _listener{bus} +{ + _listener.listen( + [this](const event::NewTeamMember& event) { _sackOfGold.emplace(event.memberName, 0); }); + + _listener.listen([this](const event::TagEvent& event) { + auto found = _sackOfGold.find(event.tag); + if(found != _sackOfGold.end()) + { + found->second = event.data.goldCount; + } + }); +} + +void Gui::draw() +{ + std::cout << "-----------------------------\n"; + for(const auto& player : _sackOfGold) + { + std::cout << "Name:" << player.first << " - gold: " << player.second << "\n"; + } + std::cout << "-----------------------------" << std::endl; +} diff --git a/use_case/tagged_events/src/Gui.hpp b/use_case/tagged_events/src/Gui.hpp new file mode 100644 index 0000000..5e00f5a --- /dev/null +++ b/use_case/tagged_events/src/Gui.hpp @@ -0,0 +1,21 @@ +// +// Created by gelldur on 22.12.2019. +// +#pragma once + +#include +#include + +#include "EventBus.hpp" + +class Gui +{ +public: + Gui(const std::shared_ptr& bus); + + void draw(); + +private: + EventBus::Listener _listener; + std::map _sackOfGold; +}; diff --git a/use_case/tagged_events/src/Team.cpp b/use_case/tagged_events/src/Team.cpp new file mode 100644 index 0000000..90a5bea --- /dev/null +++ b/use_case/tagged_events/src/Team.cpp @@ -0,0 +1,43 @@ +// +// Created by gelldur on 22.12.2019. +// +#include "Team.hpp" + +#include +#include +#include + +#include "event.hpp" + +Team::Team(std::shared_ptr bus) + : _bus(std::move(bus)) +{} + +Character& Team::getMember(const std::string& name) +{ + auto found = std::find(_names.begin(), _names.end(), name); + if(found == _names.end()) + { + throw std::out_of_range("No such team member: " + name); + } + return _squad.at(std::distance(_names.begin(), found)); +} + +void Team::addPlayer(const std::string& name) +{ + auto characterBus = std::make_shared(); + { + auto tagPerk = std::make_unique(name, _bus.get()); + tagPerk->wrapTag>(); + + characterBus->addPerk(std::move(tagPerk)) + .registerPrePostpone(&dexode::eventbus::perk::TagPerk::onPrePostponeEvent); + } + characterBus->addPerk(std::make_unique(_bus)) + .registerPrePostpone(&dexode::eventbus::perk::PassEverythingPerk::onPrePostponeEvent); + + _squad.emplace_back(characterBus); + _names.push_back(name); + + _bus->postpone(event::NewTeamMember{name}); +} diff --git a/use_case/tagged_events/src/Team.hpp b/use_case/tagged_events/src/Team.hpp new file mode 100644 index 0000000..935a7a9 --- /dev/null +++ b/use_case/tagged_events/src/Team.hpp @@ -0,0 +1,24 @@ +// +// Created by gelldur on 22.12.2019. +// +#pragma once + +#include +#include + +#include "Character.hpp" + +class Team +{ +public: + Team(std::shared_ptr bus); + void addPlayer(const std::string& name); + + Character& getMember(const std::string& name); + +private: + std::vector _names; + std::vector _squad; + + std::shared_ptr _bus; +}; diff --git a/use_case/tagged_events/src/event.hpp b/use_case/tagged_events/src/event.hpp new file mode 100644 index 0000000..57a2144 --- /dev/null +++ b/use_case/tagged_events/src/event.hpp @@ -0,0 +1,27 @@ +// +// Created by gelldur on 22.12.2019. +// +#pragma once + +namespace event +{ + +struct GoldUpdate +{ + int goldCount; +}; + +struct NewTeamMember +{ + std::string memberName; +}; + +template +struct TagEvent +{ + using Event = T; + std::string tag; + Event data; +}; + +} // namespace event diff --git a/use_case/tagged_events/src/main.cpp b/use_case/tagged_events/src/main.cpp new file mode 100644 index 0000000..a179396 --- /dev/null +++ b/use_case/tagged_events/src/main.cpp @@ -0,0 +1,42 @@ +#include +#include +#include +#include + +#include "EventBus.hpp" +#include "Gui.hpp" +#include "Team.hpp" + +int main(int argc, char* argv[]) +{ + auto eventBus = std::make_shared(); + + Gui gui{eventBus}; + Team myTeam{eventBus}; + + auto updateFrame = [&]() { + // single frame update ;) + eventBus->process(); + gui.draw(); + std::cout << "###################################################\n"; + std::cout << "###################################################" << std::endl; + }; + + { // single update + myTeam.addPlayer("Gelldur"); + updateFrame(); + } + + { // single update + myTeam.getMember("Gelldur").pickGold(100); + updateFrame(); + } + + { // single update + myTeam.addPlayer("Gosia"); + myTeam.addPlayer("Dexter"); + updateFrame(); + } + + return 0; +}