diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 91f0fdc..1635d74 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -35,7 +35,15 @@ add_library(EventBus include/eventbus/internal/TransactionCallbackVector.h include/eventbus/TokenHolder.h src/eventbus/AsyncEventBus.cpp include/eventbus/AsyncEventBus.h + + # New version of EventBus 3.0 + include/dexode/EventBus.hpp + include/dexode/eventbus/Listener.hpp + include/dexode/eventbus/Subscriber.hpp + src/dexode/eventbus/strategy/Protected.cpp include/dexode/eventbus/strategy/Protected.hpp + src/dexode/eventbus/strategy/Transaction.cpp include/dexode/eventbus/strategy/Transaction.hpp ) + add_library(Dexode::EventBus ALIAS EventBus) target_compile_features(EventBus PUBLIC cxx_std_14) diff --git a/lib/include/dexode/EventBus.hpp b/lib/include/dexode/EventBus.hpp new file mode 100644 index 0000000..d69e295 --- /dev/null +++ b/lib/include/dexode/EventBus.hpp @@ -0,0 +1,106 @@ +#pragma once + +#include +#include +#include +#include + +#include + +#include + + +#include "eventbus/Listener.hpp" +#include "eventbus/Subscriber.hpp" +#include "eventbus/internal/common.h" + +namespace dexode +{ + +template +class EventBus +{ + friend class eventbus::Listener>; + +public: + using Listener = eventbus::Listener>; + using Subscriber = eventbus::Subscriber>; + + constexpr EventBus() = default; + ~EventBus() = default; + + EventBus(const EventBus&) = delete; + EventBus(EventBus&&) = delete; + + EventBus& operator=(EventBus&&) = delete; + EventBus& operator=(const EventBus&) = delete; + + Listener createListener() + { + return Listener{*this}; + } + + template + constexpr void post(const Event& event) + { + static_assert(Dexode::Internal::validateEvent(), "Invalid event"); + _base.template post(event); + } + + template + constexpr void postpone(const Event& event) + { + static_assert(Dexode::Internal::validateEvent(), "Invalid event"); + _base.template postpone(event); + } + + constexpr std::size_t processAll() + { + return processLimit(std::numeric_limits::max()); + } + + constexpr std::size_t processLimit(std::size_t maxCountOfEvents) + { + return _base.processLimit(maxCountOfEvents); + } + + constexpr std::size_t getPostponeEventCount() const + { + return _base.getQueueEventCount(); + } + +private: + std::atomic _lastID{0}; + Strategy _base; + + std::uint32_t newListenerID() + { + return ++_lastID; + } + + template + constexpr void listen(const std::uint32_t listenerID, + std::function&& callback) + { + static_assert(Dexode::Internal::validateEvent(), "Invalid event"); + assert(callback && "callback should be valid"); // Check for valid object + + _base.template listen(listenerID, + std::forward>(callback)); + } + + constexpr void unlistenAll(const std::uint32_t listenerID) + { + _base.unlistenAll(listenerID); + } + + template + constexpr void unlisten(const std::uint32_t listenerID) + { + static_assert(Dexode::Internal::validateEvent(), "Invalid event"); + const auto eventID = Dexode::Internal::event_id; + _base.template unlisten(listenerID, eventID); + } +}; + +} // namespace dexode diff --git a/lib/include/dexode/eventbus/Listener.hpp b/lib/include/dexode/eventbus/Listener.hpp new file mode 100644 index 0000000..8be6265 --- /dev/null +++ b/lib/include/dexode/eventbus/Listener.hpp @@ -0,0 +1,86 @@ +#pragma once + +#include +#include +#include + +namespace dexode::eventbus +{ + +template +class Listener +{ +public: + Listener(Bus& bus) + : _id{bus.newListenerID()} + , _bus{&bus} + {} + + Listener(const Listener& other) = delete; + + Listener(Listener&& other) + : _id(other._id) + , _bus(other._bus) + { + other._id = 0; + other._bus = nullptr; + } + + ~Listener() + { + if(_bus != nullptr) // could be moved + { + unlistenAll(); + } + } + + Listener& operator=(const Listener& other) = delete; + + Listener& operator=(Listener&& other) + { + if(this == &other) + { + return *this; + } + + unlistenAll(); + + _id = other._id; + other._id = 0; + _bus = other._bus; + other._bus = nullptr; + + return *this; + } + + template + void listen(std::function&& callback) + { + _bus->template listen(_id, + std::forward>(callback)); + } + + template + void listen(const std::function& callback) + { + _bus->template listen(_id, callback); + } + + void unlistenAll() + { + + _bus->unlistenAll(_id); + } + + template + void unlisten() + { + _bus->template unlisten(_id); + } + +private: + std::uint32_t _id = 0; + Bus* _bus; +}; + +} // namespace dexode::eventbus diff --git a/lib/include/dexode/eventbus/Subscriber.hpp b/lib/include/dexode/eventbus/Subscriber.hpp new file mode 100644 index 0000000..6271a48 --- /dev/null +++ b/lib/include/dexode/eventbus/Subscriber.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include "dexode/EventBus.hpp" + +namespace dexode::eventbus +{ +template +class Subscriber +{ +public: + Subscriber(std::shared_ptr retainBus) + : _listener{*retainBus} + , _bus{std::move(retainBus)} + {} + + Subscriber(Subscriber&& other) + : _listener(std::move(other._listener)) + , _bus(std::move(other._bus)) + {} + + ~Subscriber() + { + unlistenAll(); // extra safety if order of members is inverse + } + + Subscriber& operator=(Subscriber&& other) + { + if(this == &other) + { + return *this; + } + + _listener = std::move(other._listener); + _bus = std::move(other._bus); + + return *this; + } + + template + void listen(std::function&& callback) + { + _listener.template listen(std::forward>(callback)); + } + + template + void listen(const std::function& callback) + { + _listener.template listen(callback); + } + + void unlistenAll() + { + _listener.unlistenAll(); + } + + template + void unlisten() + { + _listener.template unlisten(); + } + + const std::shared_ptr& getEventBus() const + { + return _bus; + } + +private: + Listener _listener; + std::shared_ptr _bus; // own +}; +} // namespace dexode::eventbus diff --git a/lib/include/dexode/eventbus/strategy/Protected.hpp b/lib/include/dexode/eventbus/strategy/Protected.hpp new file mode 100644 index 0000000..2b9a7c3 --- /dev/null +++ b/lib/include/dexode/eventbus/strategy/Protected.hpp @@ -0,0 +1,112 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "dexode/EventBus.hpp" +#include "eventbus/internal/AsyncCallbackVector.h" + +namespace dexode::eventbus::strategy +{ + +class Protected +{ +public: + Protected() = default; + ~Protected() = default; + + Protected(const Protected&) = delete; + Protected(Protected&&) = delete; + + Protected& operator=(Protected&&) = delete; + Protected& operator=(const Protected&) = delete; + + template + void post(const Event& event) + { + std::shared_lock readLock{_mutex}; + + using Vector = Dexode::Internal::AsyncCallbackVector; + auto found = _callbacks.find(Dexode::Internal::event_id()); + if(found == _callbacks.end()) + { + return; // no such notifications + } + + std::unique_ptr& vector = found->second; + assert(dynamic_cast(vector.get())); + auto* callbacks = static_cast(vector.get()); + + for(const auto& element : callbacks->container) + { + element.second(event); + } + } + + template + void postpone(const Event& event) + { + { + std::unique_lock writeLock{_mutex}; + _eventQueue.push_back([this, event]() { post(event); }); + } + _eventWaiting.notify_one(); + } + + std::size_t processLimit(const std::size_t maxCountOfEvents); + + std::size_t getPostponeEventCount() const + { + std::shared_lock lock{_mutex}; + return _eventQueue.size(); + } + + bool wait(); + bool waitFor(std::chrono::milliseconds timeout); + + template + void listen(const std::uint32_t listenerID, std::function&& callback) + { + using Vector = Dexode::Internal::AsyncCallbackVector; + + std::unique_lock writeLock{_mutex}; + auto eventListeners = _callbacks.find(Dexode::Internal::event_id()); + if(eventListeners == _callbacks.cend()) + { + eventListeners = _callbacks.emplace_hint( + eventListeners, Dexode::Internal::event_id(), std::make_unique()); + } + assert(dynamic_cast(eventListeners->second.get())); + auto* vectorImpl = static_cast(eventListeners->second.get()); + vectorImpl->add(listenerID, callback); + } + + void unlistenAll(const std::uint32_t listenerID); + + template + void unlisten(const std::uint32_t listenerID) + { + std::unique_lock writeLock{_mutex}; // TODO locking already locked mutex + auto found = _callbacks.find(Dexode::Internal::event_id()); + if(found != _callbacks.end()) + { + found->second->remove(listenerID); + } + } + +private: + std::map> + _callbacks; + mutable std::shared_mutex _mutex; + + std::mutex _waitMutex; + std::condition_variable _eventWaiting; + std::deque> _eventQueue; +}; + +} // namespace dexode::eventbus::strategy diff --git a/lib/include/dexode/eventbus/strategy/Transaction.hpp b/lib/include/dexode/eventbus/strategy/Transaction.hpp new file mode 100644 index 0000000..3f57825 --- /dev/null +++ b/lib/include/dexode/eventbus/strategy/Transaction.hpp @@ -0,0 +1,115 @@ +#pragma once + +#include +#include +#include +#include + +#include "dexode/EventBus.hpp" +#include "eventbus/internal/TransactionCallbackVector.h" + +namespace dexode::eventbus::strategy +{ + +class Transaction +{ +public: + Transaction() = default; + ~Transaction() = default; + + Transaction(const Transaction&) = delete; + Transaction(Transaction&&) = delete; + + Transaction& operator=(Transaction&&) = delete; + Transaction& operator=(const Transaction&) = delete; + + template + void post(const Event& event) + { + using Vector = Dexode::Internal::TransactionCallbackVector; + auto found = _callbacks.find(Dexode::Internal::event_id()); + if(found == _callbacks.end()) + { + return; // no such notifications + } + + std::unique_ptr& vector = found->second; + assert(dynamic_cast(vector.get())); + auto* vectorImpl = static_cast(vector.get()); + + vectorImpl->beginTransaction(); + for(const auto& element : vectorImpl->container) + { + element.second(event); + } + vectorImpl->commitTransaction(); + } + + template + void postpone(const Event& event) + { + _eventQueue.push_back([this, event]() { post(event); }); + } + + std::size_t processLimit(const std::size_t maxCountOfEvents) + { + std::size_t processed = 0; + while(processed < maxCountOfEvents && not _eventQueue.empty()) + { + auto asyncCallback = std::move(_eventQueue.front()); + _eventQueue.pop_front(); + // Needs to be done in this way. Think about recursion in this case... + asyncCallback(); + ++processed; + } + return processed; + } + + std::size_t getQueueEventCount() const noexcept + { + return _eventQueue.size(); + } + + template + void listen(const std::uint32_t listenerID, std::function&& callback) + { + using Vector = Dexode::Internal::TransactionCallbackVector; + + std::unique_ptr& vector = + _callbacks[Dexode::Internal::event_id()]; + if(vector == nullptr) + { + vector = std::make_unique(); + } + assert(dynamic_cast(vector.get())); + auto* vectorImpl = static_cast(vector.get()); + vectorImpl->add(listenerID, callback); + } + + void unlistenAll(const std::uint32_t listenerID) + { + for(auto& element : _callbacks) + { + element.second->remove(listenerID); + } + } + + template + void unlisten(const std::uint32_t listenerID) + { + static_assert(Dexode::Internal::validateEvent(), "Invalid event"); + + auto found = _callbacks.find(Dexode::Internal::event_id()); + if(found != _callbacks.end()) + { + found->second->remove(listenerID); + } + } + +private: + std::map> + _callbacks; + std::deque> _eventQueue; +}; + +} // namespace dexode::eventbus::strategy diff --git a/lib/src/dexode/eventbus/strategy/Protected.cpp b/lib/src/dexode/eventbus/strategy/Protected.cpp new file mode 100644 index 0000000..3b7c376 --- /dev/null +++ b/lib/src/dexode/eventbus/strategy/Protected.cpp @@ -0,0 +1,56 @@ +#include "dexode/eventbus/strategy/Protected.hpp" + +namespace dexode::eventbus::strategy +{ + +std::size_t Protected::processLimit(const std::size_t maxCountOfEvents) +{ + std::size_t processed = 0; + std::function postponeEventCallback; + + while(processed < maxCountOfEvents) + { + { + std::unique_lock writeLock{_mutex}; + if(_eventQueue.empty()) + { + break; + } + postponeEventCallback = std::move(_eventQueue.front()); + _eventQueue.pop_front(); + } + postponeEventCallback(); + ++processed; + } + return processed; +} + +void Protected::unlistenAll(const std::uint32_t listenerID) +{ + std::unique_lock writeLock{_mutex}; // TODO locking already locked mutex + for(auto& element : _callbacks) + { + element.second->remove(listenerID); + } +} + +bool Protected::wait() +{ + using namespace std::chrono_literals; + std::unique_lock lock(_waitMutex); + _eventWaiting.wait(lock); + + std::shared_lock readLock{_mutex}; + return not _eventQueue.empty(); +} +bool Protected::waitFor(std::chrono::milliseconds timeout) +{ + using namespace std::chrono_literals; + std::unique_lock lock(_waitMutex); + _eventWaiting.wait_for(lock, timeout); + + std::shared_lock readLock{_mutex}; + return not _eventQueue.empty(); +} + +} // namespace dexode::eventbus::strategy diff --git a/lib/src/dexode/eventbus/strategy/Transaction.cpp b/lib/src/dexode/eventbus/strategy/Transaction.cpp new file mode 100644 index 0000000..935c4c5 --- /dev/null +++ b/lib/src/dexode/eventbus/strategy/Transaction.cpp @@ -0,0 +1,6 @@ +#include "dexode/eventbus/strategy/Transaction.hpp" + +namespace dexode::eventbus::strategy +{ + +}