Add AsyncEventBus

Now it is possible to schedule events from different threads.
This commit is contained in:
Dawid Drozd 2018-09-08 19:31:05 +02:00
parent c53fc6fabb
commit 7cf4465168
15 changed files with 750 additions and 265 deletions

View File

@ -5,6 +5,8 @@ cmake_minimum_required(VERSION 3.8 FATAL_ERROR)
project(EventBusDev)
set(CMAKE_CXX_STANDARD 14)
add_subdirectory(lib/)
enable_testing()

View File

@ -2,10 +2,12 @@ cmake_minimum_required(VERSION 3.8 FATAL_ERROR)
# BUILD_SHARED_LIBS can controll build type!
project(EventBus
VERSION 2.3.0
VERSION 2.4.0
LANGUAGES CXX
)
set(CMAKE_CXX_STANDARD 14)
# Dependencies
# No dependencies for EventBus yay!
@ -23,9 +25,14 @@ set(config_install_dir "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}")
# Library definition
add_library(EventBus
src/EventCollector.cpp include/eventbus/EventCollector.h
include/eventbus/EventBus.h
)
include/eventbus/EventBus.h
include/eventbus/EventCollector.h
include/eventbus/internal/AsyncCallbackVector.h
include/eventbus/internal/CallbackVector.h
include/eventbus/internal/TransactionCallbackVector.h
include/eventbus/TokenHolder.h
src/eventbus/AsyncEventBus.cpp include/eventbus/AsyncEventBus.h
)
add_library(Dexode::EventBus ALIAS EventBus)
target_compile_features(EventBus PUBLIC cxx_std_14)

View File

@ -0,0 +1,202 @@
#pragma once
#include <algorithm>
#include <cassert>
#include <deque>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <eventbus/internal/common.h>
#include <eventbus/internal/AsyncCallbackVector.h>
namespace Dexode
{
/**
* Async version of EventBus. Events are scheduled to queue and processed when called consume method.
* Methods like listen/unlisten also now will be processed when consume being called.
*
*/
class AsyncEventBus
{
public:
AsyncEventBus() = default;
~AsyncEventBus()
{
std::lock_guard<std::mutex> guard{_callbacksMutex};
_callbacks.clear();
}
AsyncEventBus(const AsyncEventBus&) = delete;
AsyncEventBus(AsyncEventBus&&) = delete;
AsyncEventBus& operator=(AsyncEventBus&&) = delete;
AsyncEventBus& operator=(const AsyncEventBus&) = delete;
/**
* Register listener for event. Returns token used for unlisten.
* This request will be scheduled into queue. Need at least call 1 consume.
* It isn't ASAP event
*
* @tparam Event - type you want to listen for
* @param callback - your callback to handle event
* @return token used for unlisten
*/
template <typename Event>
int listen(std::function<void(const Event&)> callback)
{
static_assert(Internal::validateEvent<Event>(), "Invalid event");
const int token = newToken();
listen<Event>(token, std::move(callback));
return token;
}
/**
* Register listener for event.
* This request will be scheduled into queue. Need at least call 1 consume.
* It isn't ASAP event
*
* @tparam Event - type you want to listen for
* @param token - unique token for identification receiver. Simply pass token from @see EventBus::listen
* @param callback - your callback to handle event
*/
template <typename Event>
void listen(const int token, std::function<void(const Event&)> callback)
{
static_assert(Internal::validateEvent<Event>(), "Invalid event");
std::lock_guard<std::mutex> guard{_eventMutex};
_commandsQueue.push_back([this, token, callback = std::move(callback)]()
{
std::lock_guard<std::mutex> guard{_callbacksMutex};
using Vector = Internal::AsyncCallbackVector<Event>;
assert(callback && "callback should be valid"); //Check for valid object
std::unique_ptr<Internal::CallbackVector>& vector =
_callbacks[Internal::type_id<Event>];
if(vector == nullptr)
{
vector.reset(new Vector{});
}
assert(dynamic_cast<Vector*>(vector.get()));
Vector* callbacks = static_cast<Vector*>(vector.get());
callbacks->add(token, callback);
});
}
/**
* This request will be scheduled into queue. Need at least call 1 consume.
* It isn't ASAP event
*
* @param token - token from EventBus::listen
*/
void unlistenAll(const int token)
{
std::lock_guard<std::mutex> guard{_eventMutex};
_commandsQueue.push_back([this, token]()
{
std::lock_guard<std::mutex> guard{_callbacksMutex};
for(auto& element : _callbacks)
{
element.second->remove(token);
}
});
}
/**
* This request will be scheduled into queue. Need at least call 1 consume.
* It isn't ASAP event
*
* @tparam Event - type you want to unlisten. @see Notiier::listen
* @param token - token from EventBus::listen
*/
template <typename Event>
void unlisten(const int token)
{
static_assert(Internal::validateEvent<Event>(), "Invalid event");
std::lock_guard<std::mutex> guard{_eventMutex};
_commandsQueue.push_back([this, token]()
{
std::lock_guard<std::mutex> guard{_callbacksMutex};
auto found = _callbacks.find(Internal::type_id<Event>);
if(found != _callbacks.end())
{
found->second->remove(token);
}
});
}
/**
* Schedule event to queue
*
* @param event your event struct
*/
template <typename Event>
void schedule(Event event)
{
static_assert(Internal::validateEvent<Event>(), "Invalid event");
std::lock_guard<std::mutex> guard{_eventMutex};
_eventQueue.push_back([this, event = std::move(event)]()
{
std::lock_guard<std::mutex> guard{_callbacksMutex};
using Vector = Internal::AsyncCallbackVector<Event>;
auto found = _callbacks.find(Internal::type_id<Event>);
if(found == _callbacks.end())
{
return; // no such notifications
}
std::unique_ptr<Internal::CallbackVector>& vector = found->second;
assert(dynamic_cast<Vector*>(vector.get()));
Vector* callbacks = static_cast<Vector*>(vector.get());
for(const auto& element : callbacks->container)
{
element.second(event);
}
});
}
/**
* Process queued events. This should be called always on same thread.
* @param max maximum count of events to consume, if 0 then all available events will be consumed.
* If max is higher than available events then only available events will be consumed.
* @return number of consumed events
*/
int consume(int max = 0);
std::size_t getQueueEventCount() const
{
std::lock_guard<std::mutex> guard{_eventMutex};
return _eventQueue.size();
}
private:
int newToken()
{
std::lock_guard<std::mutex> guard{_eventMutex};
int token = ++_tokener;
return token;
}
std::size_t processCommandsAndGetQueuedEventsCount();
int _tokener = 0;
std::map<Internal::type_id_t, std::unique_ptr<Internal::CallbackVector>> _callbacks;
mutable std::mutex _callbacksMutex;
mutable std::mutex _eventMutex;
std::deque<std::function<void()>> _eventQueue;
std::deque<std::function<void()>> _commandsQueue;
};
} /* namespace Dexode */

View File

@ -5,28 +5,15 @@
#include <functional>
#include <map>
#include <memory>
#include <vector>
#include <eventbus/internal/common.h>
#include <eventbus/internal/TransactionCallbackVector.h>
namespace Dexode
{
template <typename>
void type_id() // Helper for getting "type id"
{}
using type_id_t = void (*)(); // Function pointer
class EventBus
{
template <class Event>
constexpr void validateEvent()
{
static_assert(std::is_const<Event>::value == false, "Struct must be without const");
static_assert(std::is_volatile<Event>::value == false, "Struct must be without volatile");
static_assert(std::is_reference<Event>::value == false, "Struct must be without reference");
static_assert(std::is_pointer<Event>::value == false, "Struct must be without pointer");
}
public:
EventBus() = default;
@ -51,7 +38,7 @@ public:
template <typename Event>
int listen(const std::function<void(const Event&)>& callback)
{
validateEvent<Event>();
static_assert(Internal::validateEvent<Event>(), "Invalid event");
const int token = ++_tokener;
listen<Event>(token, callback);
@ -66,13 +53,13 @@ public:
template <typename Event>
void listen(const int token, const std::function<void(const Event&)>& callback)
{
validateEvent<Event>();
static_assert(Internal::validateEvent<Event>(), "Invalid event");
using Vector = VectorImpl<Event>;
using Vector = Internal::TransactionCallbackVector<Event>;
assert(callback && "callback should be valid"); //Check for valid object
std::unique_ptr<VectorInterface>& vector = _callbacks[type_id<Event>];
std::unique_ptr<Internal::CallbackVector>& vector = _callbacks[Internal::type_id<Event>];
if(vector == nullptr)
{
vector.reset(new Vector{});
@ -100,9 +87,9 @@ public:
template <typename Event>
void unlisten(const int token)
{
validateEvent<Event>();
static_assert(Internal::validateEvent<Event>(), "Invalid event");
auto found = _callbacks.find(type_id<Event>);
auto found = _callbacks.find(Internal::type_id<Event>);
if(found != _callbacks.end())
{
found->second->remove(token);
@ -118,16 +105,16 @@ public:
void notify(const Event& event)
{
using CleanEventType = typename std::remove_const<Event>::type;
validateEvent<CleanEventType>();
static_assert(Internal::validateEvent<Event>(), "Invalid event");
using Vector = VectorImpl<CleanEventType>;
auto found = _callbacks.find(type_id<CleanEventType>);
using Vector = Internal::TransactionCallbackVector<CleanEventType>;
auto found = _callbacks.find(Internal::type_id<CleanEventType>);
if(found == _callbacks.end())
{
return; // no such notifications
}
std::unique_ptr<VectorInterface>& vector = found->second;
std::unique_ptr<Internal::CallbackVector>& vector = found->second;
assert(dynamic_cast<Vector*>(vector.get()));
Vector* vectorImpl = static_cast<Vector*>(vector.get());
@ -140,87 +127,8 @@ public:
}
private:
struct VectorInterface
{
virtual ~VectorInterface() = default;
virtual void remove(const int token) = 0;
};
template <typename Event>
struct VectorImpl : public VectorInterface
{
using CallbackType = std::function<void(const Event&)>;
using ContainerElement = std::pair<int, CallbackType>;
using ContainerType = std::vector<ContainerElement>;
ContainerType container;
ContainerType toAdd;
std::vector<int> toRemove;
int inTransaction = 0;
virtual void remove(const int token) override
{
if(inTransaction > 0)
{
toRemove.push_back(token);
return;
}
//Invalidation rules: https://stackoverflow.com/questions/6438086/iterator-invalidation-rules
auto removeFrom = std::remove_if(
container.begin(), container.end(), [token](const ContainerElement& element) {
return element.first == token;
});
if(removeFrom != container.end())
{
container.erase(removeFrom, container.end());
}
}
void add(const int token, const CallbackType& callback)
{
if(inTransaction > 0)
{
toAdd.emplace_back(token, callback);
}
else
{
container.emplace_back(token, callback);
}
}
void beginTransaction()
{
++inTransaction;
}
void commitTransaction()
{
--inTransaction;
if(inTransaction > 0)
{
return;
}
inTransaction = 0;
if(toAdd.empty() == false)
{
container.insert(container.end(), toAdd.begin(), toAdd.end());
toAdd.clear();
}
if(toRemove.empty() == false)
{
for(auto token : toRemove)
{
remove(token);
}
toRemove.clear();
}
}
};
int _tokener = 0;
std::map<type_id_t, std::unique_ptr<VectorInterface>> _callbacks;
std::map<Internal::type_id_t, std::unique_ptr<Internal::CallbackVector>> _callbacks;
};
} /* namespace Dexode */

View File

@ -7,65 +7,12 @@
#include <memory>
#include "EventBus.h"
#include "TokenHolder.h"
namespace Dexode
{
class EventCollector
{
public:
EventCollector(const std::shared_ptr<EventBus>& bus);
EventCollector(EventBus* bus);
EventCollector(EventCollector const& other);
EventCollector(EventCollector&& other);
~EventCollector();
EventCollector& operator=(EventCollector const& other);
EventCollector& operator=(EventCollector&& other);
/**
* Register listener for event.
*
* @tparam Event - type you want to listen for
* @param callback - your callback to handle event
*/
template<typename Event>
void listen(const std::function<void(const Event&)>& callback)
{
if (!callback || !_bus)
{
return;//Skip such things
}
if (_token == 0)
{
_token = _bus->listen<Event>(callback);
}
else
{
_bus->listen<Event>(_token, callback);
}
}
void unlistenAll();
/**
* @tparam Event - type you want to unlisten. @see Notiier::listen
*/
template<typename Event>
void unlisten()
{
if (_bus)
{
_bus->unlisten<Event>(_token);
}
}
bool isUsing(const std::shared_ptr<EventBus>& bus) const;
private:
int _token = 0;
std::shared_ptr<EventBus> _bus;
};
// [[deprecated("Deprecating EventCollector. Try move to: TokenHolder<>")]]
using EventCollector = TokenHolder<EventBus>;
}

View File

@ -0,0 +1,141 @@
#pragma once
#include <cassert>
#include <functional>
#include <memory>
namespace
{
template <class Bus>
void null_deleter(Bus*)
{
}
} // namespace
namespace Dexode
{
//
//template<class Bus>
//class EventBusWrapper : Bus
template <class Bus>
class TokenHolder
{
public:
TokenHolder(const std::shared_ptr<Bus>& bus)
: _bus{bus}
{
assert(_bus);
}
TokenHolder(Bus* bus)
: _bus(bus, &null_deleter<Bus>)
{
}
TokenHolder(const TokenHolder& other)
: _bus(other._bus)
{
}
TokenHolder(TokenHolder&& other)
: _token(other._token)
, _bus(std::move(other._bus))
{
other._token = 0;
}
~TokenHolder()
{
unlistenAll();
}
TokenHolder& operator=(const TokenHolder& other)
{
if(this == &other)
{
return *this;
}
if(other._bus.get() != _bus.get())
{
unlistenAll();
_bus = other._bus;
}
return *this;
}
TokenHolder& operator=(TokenHolder&& other)
{
if(this == &other)
{
return *this;
}
unlistenAll();
_token = other._token;
other._token = 0;
_bus = std::move(other._bus);
return *this;
}
/**
* Register listener for event.
*
* @tparam Event - type you want to listen for
* @param callback - your callback to handle event
*/
template <typename Event>
void listen(std::function<void(const Event&)> callback)
{
if(!callback || !_bus)
{
assert(callback);
assert(_bus);
return; //Skip such things
}
if(_token == 0)
{
_token = _bus->template listen<Event>(std::move(callback));
}
else
{
_bus->template listen<Event>(_token, std::move(callback));
}
}
void unlistenAll()
{
if(_token != 0 && _bus)
{
_bus->unlistenAll(_token);
}
}
/**
* @tparam Event - type you want to unlisten. @see Notiier::listen
*/
template <typename Event>
void unlisten()
{
if(_bus)
{
_bus->template unlisten<Event>(_token);
}
}
bool isUsing(const std::shared_ptr<Bus>& bus) const
{
return _bus == bus;
}
private:
int _token = 0;
std::shared_ptr<Bus> _bus;
};
} // namespace Dexode

View File

@ -0,0 +1,41 @@
#pragma once
#include <algorithm>
#include <functional>
#include <vector>
#include "CallbackVector.h"
namespace Dexode
{
namespace Internal
{
template <typename Event>
struct AsyncCallbackVector : public CallbackVector
{
using CallbackType = std::function<void(const Event&)>;
using ContainerElement = std::pair<int, CallbackType>;
std::vector<ContainerElement> container;
virtual void remove(const int token) override
{
auto removeFrom = std::remove_if(
container.begin(), container.end(), [token](const ContainerElement& element)
{
return element.first == token;
});
if(removeFrom != container.end())
{
container.erase(removeFrom, container.end());
}
}
void add(const int token, CallbackType callback)
{
container.emplace_back(token, std::move(callback));
}
};
} // namespace Internal
} // namespace Dexode

View File

@ -0,0 +1,16 @@
#pragma once
namespace Dexode
{
namespace Internal
{
struct CallbackVector
{
virtual ~CallbackVector() = default;
virtual void remove(const int token) = 0;
};
} // namespace Internal
} // namespace Dexode

View File

@ -0,0 +1,88 @@
#pragma once
#include <algorithm>
#include <functional>
#include <vector>
#include "CallbackVector.h"
namespace Dexode
{
namespace Internal
{
template <typename Event>
struct TransactionCallbackVector : public CallbackVector
{
using CallbackType = std::function<void(const Event&)>;
using ContainerElement = std::pair<int, CallbackType>;
using ContainerType = std::vector<ContainerElement>;
ContainerType container;
ContainerType toAdd;
std::vector<int> toRemove;
int inTransaction = 0;
virtual void remove(const int token) override
{
if(inTransaction > 0)
{
toRemove.push_back(token);
return;
}
//Invalidation rules: https://stackoverflow.com/questions/6438086/iterator-invalidation-rules
auto removeFrom = std::remove_if(
container.begin(), container.end(), [token](const ContainerElement& element)
{
return element.first == token;
});
if(removeFrom != container.end())
{
container.erase(removeFrom, container.end());
}
}
void add(const int token, const CallbackType& callback)
{
if(inTransaction > 0)
{
toAdd.emplace_back(token, callback);
}
else
{
container.emplace_back(token, callback);
}
}
void beginTransaction()
{
++inTransaction;
}
void commitTransaction()
{
--inTransaction;
if(inTransaction > 0)
{
return;
}
inTransaction = 0;
if(toAdd.empty() == false)
{
container.insert(container.end(), toAdd.begin(), toAdd.end());
toAdd.clear();
}
if(toRemove.empty() == false)
{
for(auto token : toRemove)
{
remove(token);
}
toRemove.clear();
}
}
};
} // namespace Internal
} // namespace Dexode

View File

@ -0,0 +1,28 @@
#pragma once
#include <type_traits>
namespace Dexode
{
namespace Internal
{
template <typename>
void type_id() // Helper for getting "type id"
{
}
using type_id_t = void (*)(); // Function pointer
template <class Event>
constexpr bool validateEvent()
{
static_assert(std::is_const<Event>::value == false, "Struct must be without const");
static_assert(std::is_volatile<Event>::value == false, "Struct must be without volatile");
static_assert(std::is_reference<Event>::value == false, "Struct must be without reference");
static_assert(std::is_pointer<Event>::value == false, "Struct must be without pointer");
return true;
}
} // namespace Internal
} // namespace Dexode

View File

@ -1,94 +0,0 @@
//
// Created by Dawid Drozd aka Gelldur on 18/10/16.
//
#include <eventbus/EventCollector.h>
#include <cassert>
namespace
{
void null_deleter(Dexode::EventBus*)
{
}
}
namespace Dexode
{
EventCollector::EventCollector(const std::shared_ptr<EventBus>& bus)
: _bus(bus)
{
assert(_bus);
}
//Maybe ugly but hey ;) Less code and simply i can :D
EventCollector::EventCollector(EventBus* bus)
: _bus(bus, &null_deleter)
{
}
EventCollector::EventCollector(const EventCollector& other)
: _bus(other._bus)
{
}
EventCollector::EventCollector(EventCollector&& other)
: _token(other._token)
, _bus(std::move(other._bus))
{
other._token = 0;
}
EventCollector::~EventCollector()
{
unlistenAll();
}
EventCollector& EventCollector::operator=(const EventCollector& other)
{
if (this == &other)
{
return *this;
}
if (other._bus.get() != _bus.get())
{
unlistenAll();
_bus = other._bus;
}
return *this;
}
EventCollector& EventCollector::operator=(EventCollector&& other)
{
if (this == &other)
{
return *this;
}
unlistenAll();
_token = other._token;
other._token = 0;
_bus = std::move(other._bus);
return *this;
}
void EventCollector::unlistenAll()
{
if (_token != 0 && _bus)
{
_bus->unlistenAll(_token);
}
}
bool EventCollector::isUsing(const std::shared_ptr<EventBus>& bus) const
{
return _bus == bus;
}
}

View File

@ -0,0 +1,44 @@
#include <eventbus/AsyncEventBus.h>
#include <limits>
namespace Dexode
{
std::size_t AsyncEventBus::processCommandsAndGetQueuedEventsCount()
{
std::lock_guard<std::mutex> guard{_eventMutex};
while(_commandsQueue.empty() == false)
{
_commandsQueue.front()();//This can't add any extra commands, because in this queue we story only listen/unlisten stuff
_commandsQueue.pop_front();
}
return _eventQueue.size(); //Yeah we want to return events count. So don't have to call getQueueEventCount
}
int AsyncEventBus::consume(int max)
{
if(max == 0)
{
max = std::numeric_limits<int>::max();
}
int consumed = 0;
std::function<void()> eventCommand;
while(processCommandsAndGetQueuedEventsCount() > 0 && consumed < max) //order is important
{
{
std::lock_guard<std::mutex> guard{_eventMutex};
eventCommand = std::move(_eventQueue.front());
_eventQueue.pop_front();
}
eventCommand();
++consumed;
}
return consumed;
}
} // namespace Dexode

View File

@ -1,5 +1,7 @@
cmake_minimum_required(VERSION 3.8 FATAL_ERROR)
set(CMAKE_CXX_STANDARD 14)
# http://www.levelofindirection.com/journal/2010/12/28/unit-testing-in-c-and-objective-c-just-got-easier.html
# Thanks for CATCH!
@ -46,4 +48,4 @@ target_link_libraries(EventBusPerformance PUBLIC
benchmark
$<$<BOOL:${Poco_FOUND}>:Poco::Foundation>
$<$<BOOL:${Poco_FOUND}>:Poco::Util>
)
)

View File

@ -5,6 +5,8 @@ cmake_minimum_required(VERSION 3.8 FATAL_ERROR)
project(EventBusTest)
set(CMAKE_CXX_STANDARD 14)
# Dependencies
enable_testing()
if (NOT TARGET Dexode::EventBus)
@ -13,12 +15,14 @@ endif ()
# From 2.3.X they broke back compatibility
find_package(Catch2 2.3 REQUIRED)
find_package(Threads REQUIRED)
# Target definition
add_executable(EventBusTest
src/EventCollectorTest.cpp
src/NotifierTest.cpp
)
src/AsyncEventBusTest.cpp
src/EventCollectorTest.cpp
src/NotifierTest.cpp
)
target_compile_options(EventBusTest PUBLIC
-Wall -pedantic
@ -47,7 +51,7 @@ set(EVENTBUS_DEBUG_FLAGS
target_compile_options(EventBusTest PUBLIC "$<$<CONFIG:DEBUG>:${EVENTBUS_DEBUG_FLAGS}>")
target_link_libraries(EventBusTest PUBLIC Dexode::EventBus Catch2::Catch2)
target_link_libraries(EventBusTest PUBLIC Catch2::Catch2 Dexode::EventBus Threads::Threads)
add_test(NAME EventBus.UnitTests COMMAND EventBusTest)

View File

@ -0,0 +1,149 @@
#include <thread>
#include <string>
#include <iostream>
#include <chrono>
#include <catch2/catch.hpp>
#include <eventbus/AsyncEventBus.h>
#include <eventbus/EventCollector.h>
using namespace std::chrono;
const auto ns2 = std::chrono::nanoseconds{2};// GCC 7 has some issues with 2ms :/
const auto ns3 = std::chrono::nanoseconds{3};
TEST_CASE("Should consume events in synchronous way When using AsyncEventBus", "[AsyncEventBus]")
{
struct SimpleEvent
{
std::thread::id id;
};
Dexode::AsyncEventBus bus;
int counter = 0;
bus.listen<SimpleEvent>([&counter](const SimpleEvent& event)
{
std::cout << "Event from: " << event.id << std::endl;
++counter;
});
std::thread worker1{[&bus]()
{
for(int i = 0; i < 10; ++i)
{
bus.schedule(SimpleEvent{std::this_thread::get_id()});
std::this_thread::sleep_for(ns3);
}
}};
std::thread worker2{[&bus]()
{
for(int i = 0; i < 10; ++i)
{
bus.schedule(SimpleEvent{std::this_thread::get_id()});
std::this_thread::sleep_for(ns2);
}
}};
REQUIRE(counter == 0);
while(counter < 20)
{
static int sumOfConsumed = 0;
REQUIRE(counter == sumOfConsumed);
sumOfConsumed += bus.consume();
REQUIRE(counter == sumOfConsumed);
}
worker1.join();
worker2.join();
}
TEST_CASE("Should unlisten for event When call unlisten inside Listener", "[AsyncEventBus]")
{
struct SimpleEvent
{
std::thread::id id;
};
Dexode::AsyncEventBus bus;
int counter = 0;
const int myToken = 0x23167;
bus.listen<SimpleEvent>(myToken, [&counter, &bus](const SimpleEvent& event)
{
std::cout << "Event from: " << event.id << std::endl;
++counter;
bus.unlistenAll(myToken);//This doesn't mean that unlisten will be ASAP!
});
REQUIRE(counter == 0);
bus.schedule(SimpleEvent{std::this_thread::get_id()});
//This should consume (listen request), SimpleEvent, ()unlisten request
REQUIRE(bus.consume(1) == 1);
for(int i = 0; i < 10; ++i)
{
bus.consume();
bus.schedule(SimpleEvent{std::this_thread::get_id()});
bus.consume();
}
REQUIRE(counter == 1);//Should be called only once
}
TEST_CASE("Should listen for only 1 event When call unlisten inside Listener", "[AsyncEventBus]")
{
struct SimpleEvent
{
std::thread::id id;
};
Dexode::AsyncEventBus bus;
int counter = 0;
const int myToken = 0x23167;
bus.listen<SimpleEvent>(myToken, [&counter, &bus](const SimpleEvent& event)
{
std::cout << "Event from: " << event.id << std::endl;
++counter;
bus.unlistenAll(myToken);//This doesn't mean that unlisten will be ASAP!
});
//Workers in this test are only extra stuff
std::thread worker1{[&bus]()
{
for(int i = 0; i < 10; ++i)
{
bus.schedule(SimpleEvent{std::this_thread::get_id()});
std::this_thread::sleep_for(ns3);
}
}};
//Workers in this test are only extra stuff
std::thread worker2{[&bus]()
{
for(int i = 0; i < 10; ++i)
{
bus.schedule(SimpleEvent{std::this_thread::get_id()});
std::this_thread::sleep_for(ns2);
}
}};
REQUIRE(counter == 0);
for(int i = 0; i < 10; ++i)
{
bus.schedule(SimpleEvent{std::this_thread::get_id()});
bus.schedule(SimpleEvent{std::this_thread::get_id()});
bus.consume();
}
REQUIRE(counter == 1);//Should be called only once
worker1.join();
worker2.join();
}