Add preview of EventBus 3.0

This commit is contained in:
Dawid Drozd 2019-06-30 16:16:59 +02:00
parent 7e6bb32e27
commit 2d2733afde
8 changed files with 560 additions and 0 deletions

View File

@ -35,7 +35,15 @@ add_library(EventBus
include/eventbus/internal/TransactionCallbackVector.h
include/eventbus/TokenHolder.h
src/eventbus/AsyncEventBus.cpp include/eventbus/AsyncEventBus.h
# New version of EventBus 3.0
include/dexode/EventBus.hpp
include/dexode/eventbus/Listener.hpp
include/dexode/eventbus/Subscriber.hpp
src/dexode/eventbus/strategy/Protected.cpp include/dexode/eventbus/strategy/Protected.hpp
src/dexode/eventbus/strategy/Transaction.cpp include/dexode/eventbus/strategy/Transaction.hpp
)
add_library(Dexode::EventBus ALIAS EventBus)
target_compile_features(EventBus PUBLIC cxx_std_14)

View File

@ -0,0 +1,106 @@
#pragma once
#include <atomic>
#include <cassert>
#include <functional>
#include <limits>
#include <eventbus/EventBus.h>
#include <dexode/EventBus.hpp>
#include "eventbus/Listener.hpp"
#include "eventbus/Subscriber.hpp"
#include "eventbus/internal/common.h"
namespace dexode
{
template <class Strategy>
class EventBus
{
friend class eventbus::Listener<EventBus<Strategy>>;
public:
using Listener = eventbus::Listener<EventBus<Strategy>>;
using Subscriber = eventbus::Subscriber<EventBus<Strategy>>;
constexpr EventBus() = default;
~EventBus() = default;
EventBus(const EventBus&) = delete;
EventBus(EventBus&&) = delete;
EventBus& operator=(EventBus&&) = delete;
EventBus& operator=(const EventBus&) = delete;
Listener createListener()
{
return Listener{*this};
}
template <typename Event>
constexpr void post(const Event& event)
{
static_assert(Dexode::Internal::validateEvent<Event>(), "Invalid event");
_base.template post<Event>(event);
}
template <typename Event>
constexpr void postpone(const Event& event)
{
static_assert(Dexode::Internal::validateEvent<Event>(), "Invalid event");
_base.template postpone<Event>(event);
}
constexpr std::size_t processAll()
{
return processLimit(std::numeric_limits<std::size_t>::max());
}
constexpr std::size_t processLimit(std::size_t maxCountOfEvents)
{
return _base.processLimit(maxCountOfEvents);
}
constexpr std::size_t getPostponeEventCount() const
{
return _base.getQueueEventCount();
}
private:
std::atomic<std::uint32_t> _lastID{0};
Strategy _base;
std::uint32_t newListenerID()
{
return ++_lastID;
}
template <class Event>
constexpr void listen(const std::uint32_t listenerID,
std::function<void(const Event&)>&& callback)
{
static_assert(Dexode::Internal::validateEvent<Event>(), "Invalid event");
assert(callback && "callback should be valid"); // Check for valid object
_base.template listen<Event>(listenerID,
std::forward<std::function<void(const Event&)>>(callback));
}
constexpr void unlistenAll(const std::uint32_t listenerID)
{
_base.unlistenAll(listenerID);
}
template <typename Event>
constexpr void unlisten(const std::uint32_t listenerID)
{
static_assert(Dexode::Internal::validateEvent<Event>(), "Invalid event");
const auto eventID = Dexode::Internal::event_id<Event>;
_base.template unlisten(listenerID, eventID);
}
};
} // namespace dexode

View File

@ -0,0 +1,86 @@
#pragma once
#include <cstdint>
#include <functional>
#include <memory>
namespace dexode::eventbus
{
template <class Bus>
class Listener
{
public:
Listener(Bus& bus)
: _id{bus.newListenerID()}
, _bus{&bus}
{}
Listener(const Listener& other) = delete;
Listener(Listener&& other)
: _id(other._id)
, _bus(other._bus)
{
other._id = 0;
other._bus = nullptr;
}
~Listener()
{
if(_bus != nullptr) // could be moved
{
unlistenAll();
}
}
Listener& operator=(const Listener& other) = delete;
Listener& operator=(Listener&& other)
{
if(this == &other)
{
return *this;
}
unlistenAll();
_id = other._id;
other._id = 0;
_bus = other._bus;
other._bus = nullptr;
return *this;
}
template <class Event>
void listen(std::function<void(const Event&)>&& callback)
{
_bus->template listen<Event>(_id,
std::forward<std::function<void(const Event&)>>(callback));
}
template <class Event>
void listen(const std::function<void(const Event&)>& callback)
{
_bus->template listen<Event>(_id, callback);
}
void unlistenAll()
{
_bus->unlistenAll(_id);
}
template <typename Event>
void unlisten()
{
_bus->template unlisten<Event>(_id);
}
private:
std::uint32_t _id = 0;
Bus* _bus;
};
} // namespace dexode::eventbus

View File

@ -0,0 +1,71 @@
#pragma once
#include "dexode/EventBus.hpp"
namespace dexode::eventbus
{
template <class Bus>
class Subscriber
{
public:
Subscriber(std::shared_ptr<Bus> retainBus)
: _listener{*retainBus}
, _bus{std::move(retainBus)}
{}
Subscriber(Subscriber&& other)
: _listener(std::move(other._listener))
, _bus(std::move(other._bus))
{}
~Subscriber()
{
unlistenAll(); // extra safety if order of members is inverse
}
Subscriber& operator=(Subscriber&& other)
{
if(this == &other)
{
return *this;
}
_listener = std::move(other._listener);
_bus = std::move(other._bus);
return *this;
}
template <class Event>
void listen(std::function<void(const Event&)>&& callback)
{
_listener.template listen<Event>(std::forward<std::function<void(const Event&)>>(callback));
}
template <class Event>
void listen(const std::function<void(const Event&)>& callback)
{
_listener.template listen<Event>(callback);
}
void unlistenAll()
{
_listener.unlistenAll();
}
template <typename Event>
void unlisten()
{
_listener.template unlisten<Event>();
}
const std::shared_ptr<Bus>& getEventBus() const
{
return _bus;
}
private:
Listener<Bus> _listener;
std::shared_ptr<Bus> _bus; // own
};
} // namespace dexode::eventbus

View File

@ -0,0 +1,112 @@
#pragma once
#include <condition_variable>
#include <deque>
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <vector>
#include "dexode/EventBus.hpp"
#include "eventbus/internal/AsyncCallbackVector.h"
namespace dexode::eventbus::strategy
{
class Protected
{
public:
Protected() = default;
~Protected() = default;
Protected(const Protected&) = delete;
Protected(Protected&&) = delete;
Protected& operator=(Protected&&) = delete;
Protected& operator=(const Protected&) = delete;
template <typename Event>
void post(const Event& event)
{
std::shared_lock readLock{_mutex};
using Vector = Dexode::Internal::AsyncCallbackVector<Event>;
auto found = _callbacks.find(Dexode::Internal::event_id<Event>());
if(found == _callbacks.end())
{
return; // no such notifications
}
std::unique_ptr<Dexode::Internal::CallbackVector>& vector = found->second;
assert(dynamic_cast<Vector*>(vector.get()));
auto* callbacks = static_cast<Vector*>(vector.get());
for(const auto& element : callbacks->container)
{
element.second(event);
}
}
template <typename Event>
void postpone(const Event& event)
{
{
std::unique_lock writeLock{_mutex};
_eventQueue.push_back([this, event]() { post<Event>(event); });
}
_eventWaiting.notify_one();
}
std::size_t processLimit(const std::size_t maxCountOfEvents);
std::size_t getPostponeEventCount() const
{
std::shared_lock lock{_mutex};
return _eventQueue.size();
}
bool wait();
bool waitFor(std::chrono::milliseconds timeout);
template <class Event>
void listen(const std::uint32_t listenerID, std::function<void(const Event&)>&& callback)
{
using Vector = Dexode::Internal::AsyncCallbackVector<Event>;
std::unique_lock writeLock{_mutex};
auto eventListeners = _callbacks.find(Dexode::Internal::event_id<Event>());
if(eventListeners == _callbacks.cend())
{
eventListeners = _callbacks.emplace_hint(
eventListeners, Dexode::Internal::event_id<Event>(), std::make_unique<Vector>());
}
assert(dynamic_cast<Vector*>(eventListeners->second.get()));
auto* vectorImpl = static_cast<Vector*>(eventListeners->second.get());
vectorImpl->add(listenerID, callback);
}
void unlistenAll(const std::uint32_t listenerID);
template <typename Event>
void unlisten(const std::uint32_t listenerID)
{
std::unique_lock writeLock{_mutex}; // TODO locking already locked mutex
auto found = _callbacks.find(Dexode::Internal::event_id<Event>());
if(found != _callbacks.end())
{
found->second->remove(listenerID);
}
}
private:
std::map<Dexode::Internal::event_id_t, std::unique_ptr<Dexode::Internal::CallbackVector>>
_callbacks;
mutable std::shared_mutex _mutex;
std::mutex _waitMutex;
std::condition_variable _eventWaiting;
std::deque<std::function<void()>> _eventQueue;
};
} // namespace dexode::eventbus::strategy

View File

@ -0,0 +1,115 @@
#pragma once
#include <deque>
#include <map>
#include <memory>
#include <vector>
#include "dexode/EventBus.hpp"
#include "eventbus/internal/TransactionCallbackVector.h"
namespace dexode::eventbus::strategy
{
class Transaction
{
public:
Transaction() = default;
~Transaction() = default;
Transaction(const Transaction&) = delete;
Transaction(Transaction&&) = delete;
Transaction& operator=(Transaction&&) = delete;
Transaction& operator=(const Transaction&) = delete;
template <typename Event>
void post(const Event& event)
{
using Vector = Dexode::Internal::TransactionCallbackVector<Event>;
auto found = _callbacks.find(Dexode::Internal::event_id<Event>());
if(found == _callbacks.end())
{
return; // no such notifications
}
std::unique_ptr<Dexode::Internal::CallbackVector>& vector = found->second;
assert(dynamic_cast<Vector*>(vector.get()));
auto* vectorImpl = static_cast<Vector*>(vector.get());
vectorImpl->beginTransaction();
for(const auto& element : vectorImpl->container)
{
element.second(event);
}
vectorImpl->commitTransaction();
}
template <typename Event>
void postpone(const Event& event)
{
_eventQueue.push_back([this, event]() { post<Event>(event); });
}
std::size_t processLimit(const std::size_t maxCountOfEvents)
{
std::size_t processed = 0;
while(processed < maxCountOfEvents && not _eventQueue.empty())
{
auto asyncCallback = std::move(_eventQueue.front());
_eventQueue.pop_front();
// Needs to be done in this way. Think about recursion in this case...
asyncCallback();
++processed;
}
return processed;
}
std::size_t getQueueEventCount() const noexcept
{
return _eventQueue.size();
}
template <class Event>
void listen(const std::uint32_t listenerID, std::function<void(const Event&)>&& callback)
{
using Vector = Dexode::Internal::TransactionCallbackVector<Event>;
std::unique_ptr<Dexode::Internal::CallbackVector>& vector =
_callbacks[Dexode::Internal::event_id<Event>()];
if(vector == nullptr)
{
vector = std::make_unique<Vector>();
}
assert(dynamic_cast<Vector*>(vector.get()));
auto* vectorImpl = static_cast<Vector*>(vector.get());
vectorImpl->add(listenerID, callback);
}
void unlistenAll(const std::uint32_t listenerID)
{
for(auto& element : _callbacks)
{
element.second->remove(listenerID);
}
}
template <typename Event>
void unlisten(const std::uint32_t listenerID)
{
static_assert(Dexode::Internal::validateEvent<Event>(), "Invalid event");
auto found = _callbacks.find(Dexode::Internal::event_id<Event>());
if(found != _callbacks.end())
{
found->second->remove(listenerID);
}
}
private:
std::map<Dexode::Internal::event_id_t, std::unique_ptr<Dexode::Internal::CallbackVector>>
_callbacks;
std::deque<std::function<void()>> _eventQueue;
};
} // namespace dexode::eventbus::strategy

View File

@ -0,0 +1,56 @@
#include "dexode/eventbus/strategy/Protected.hpp"
namespace dexode::eventbus::strategy
{
std::size_t Protected::processLimit(const std::size_t maxCountOfEvents)
{
std::size_t processed = 0;
std::function<void()> postponeEventCallback;
while(processed < maxCountOfEvents)
{
{
std::unique_lock writeLock{_mutex};
if(_eventQueue.empty())
{
break;
}
postponeEventCallback = std::move(_eventQueue.front());
_eventQueue.pop_front();
}
postponeEventCallback();
++processed;
}
return processed;
}
void Protected::unlistenAll(const std::uint32_t listenerID)
{
std::unique_lock writeLock{_mutex}; // TODO locking already locked mutex
for(auto& element : _callbacks)
{
element.second->remove(listenerID);
}
}
bool Protected::wait()
{
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lock(_waitMutex);
_eventWaiting.wait(lock);
std::shared_lock readLock{_mutex};
return not _eventQueue.empty();
}
bool Protected::waitFor(std::chrono::milliseconds timeout)
{
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lock(_waitMutex);
_eventWaiting.wait_for(lock, timeout);
std::shared_lock readLock{_mutex};
return not _eventQueue.empty();
}
} // namespace dexode::eventbus::strategy

View File

@ -0,0 +1,6 @@
#include "dexode/eventbus/strategy/Transaction.hpp"
namespace dexode::eventbus::strategy
{
}