feat add event_bus #1
@ -134,8 +134,9 @@ function(sled_add_test)
|
||||
add_executable(${SLED_TEST_NAME} ${SLED_TEST_SRCS})
|
||||
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||
set(EXTRA_FLAGS -Wthread-safety -g -fsanitize=address
|
||||
-fno-omit-frame-pointer -fno-optimize-sibling-calls)
|
||||
set(EXTRA_FLAGS # -Wthread-safety
|
||||
-g -fsanitize=address -fno-omit-frame-pointer
|
||||
-fno-optimize-sibling-calls)
|
||||
target_compile_options(${SLED_TEST_NAME} PRIVATE ${EXTRA_FLAGS})
|
||||
|
||||
target_link_options(${SLED_TEST_NAME} PRIVATE ${EXTRA_FLAGS})
|
||||
@ -179,11 +180,13 @@ if(SLED_BUILD_TESTS)
|
||||
src/sled/rx_test.cc
|
||||
src/sled/uri_test.cc)
|
||||
|
||||
sled_add_test(NAME sled_symbolize_test SRCS
|
||||
src/sled/debugging/symbolize_test.cc NO_MAIN)
|
||||
sled_add_test(NAME sled_event_bus_test SRCS
|
||||
src/sled/event_bus/event_bus_test.cc)
|
||||
sled_add_test(NAME sled_lua_test SRCS tests/lua_test.cc)
|
||||
sled_add_test(NAME sled_move_on_copy_test SRCS
|
||||
src/sled/utility/move_on_copy_test.cc)
|
||||
sled_add_test(NAME sled_symbolize_test SRCS
|
||||
src/sled/debugging/symbolize_test.cc NO_MAIN)
|
||||
endif(SLED_BUILD_TESTS)
|
||||
|
||||
if(SLED_BUILD_FUZZ)
|
||||
|
154
src/sled/event_bus/event_bus.h
Normal file
154
src/sled/event_bus/event_bus.h
Normal file
@ -0,0 +1,154 @@
|
||||
#ifndef SLED_EVENT_BUS_EVENT_BUS_H
|
||||
#define SLED_EVENT_BUS_EVENT_BUS_H
|
||||
|
||||
#include "sled/sigslot.h"
|
||||
#include "sled/synchronization/mutex.h"
|
||||
#include <typeindex>
|
||||
|
||||
namespace sled {
|
||||
|
||||
class EventBus;
|
||||
class EventSubscriber;
|
||||
|
||||
namespace {
|
||||
|
||||
template<typename Event>
|
||||
class EventRegistry {
|
||||
public:
|
||||
using Dispatcher = sigslot::signal1<Event, sigslot::single_threaded>;
|
||||
using SubscriberTable = std::unordered_map<EventBus *, Dispatcher>;
|
||||
|
||||
static EventRegistry &Instance()
|
||||
{
|
||||
static EventRegistry instance_;
|
||||
return instance_;
|
||||
}
|
||||
|
||||
static std::function<void(EventBus *)> &GetCleanupHandler()
|
||||
{
|
||||
static std::function<void(EventBus *)> cleanup_handler
|
||||
= std::bind(&EventRegistry::OnBusDestroyed, &Instance(), std::placeholders::_1);
|
||||
return cleanup_handler;
|
||||
}
|
||||
|
||||
void Post(EventBus *bus, Event event)
|
||||
{
|
||||
sled::SharedMutexReadLock lock(&shared_mutex_);
|
||||
if (signals_.empty()) { return; }
|
||||
auto iter = signals_.find(bus);
|
||||
if (iter != signals_.end()) { iter->second(event); }
|
||||
}
|
||||
|
||||
template<typename C>
|
||||
void Subscribe(EventBus *bus, C *instance, void (C::*method)(Event))
|
||||
{
|
||||
sled::SharedMutexWriteLock lock(&shared_mutex_);
|
||||
auto iter = signals_.find(bus);
|
||||
if (iter == signals_.end()) {
|
||||
signals_.emplace(bus, Dispatcher());
|
||||
iter = signals_.find(bus);
|
||||
}
|
||||
auto &dispatcher = iter->second;
|
||||
dispatcher.connect(instance, method);
|
||||
}
|
||||
|
||||
template<typename C>
|
||||
void Unsubscribe(EventBus *bus, C *instance)
|
||||
{
|
||||
sled::SharedMutexWriteLock lock(&shared_mutex_);
|
||||
auto iter = signals_.find(bus);
|
||||
if (iter == signals_.end()) { return; }
|
||||
auto &dispatcher = iter->second;
|
||||
dispatcher.disconnect(instance);
|
||||
}
|
||||
|
||||
bool IsEmpty(EventBus *bus) const
|
||||
{
|
||||
sled::SharedMutexReadLock lock(&shared_mutex_);
|
||||
auto iter = signals_.find(bus);
|
||||
if (iter == signals_.end()) { return true; }
|
||||
return iter->second.is_empty();
|
||||
}
|
||||
|
||||
void OnBusDestroyed(EventBus *bus)
|
||||
{
|
||||
sled::SharedMutexWriteLock lock(&shared_mutex_);
|
||||
signals_.erase(bus);
|
||||
}
|
||||
|
||||
template<typename C>
|
||||
void OnSubscriberDestroyed(C *instance)
|
||||
{
|
||||
sled::SharedMutexWriteLock lock(&shared_mutex_);
|
||||
for (auto &entry : signals_) {
|
||||
auto &dispatcher = entry.second;
|
||||
dispatcher.disconnect(instance);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
mutable sled::SharedMutex shared_mutex_;
|
||||
|
||||
SubscriberTable signals_;
|
||||
};
|
||||
|
||||
}// namespace
|
||||
|
||||
class EventSubscriber : public sigslot::has_slots<> {
|
||||
public:
|
||||
virtual ~EventSubscriber() {}
|
||||
};
|
||||
|
||||
class EventBus {
|
||||
public:
|
||||
EventBus() = default;
|
||||
|
||||
~EventBus()
|
||||
{
|
||||
for (const auto &handler : cleanup_handlers_) { handler.second(this); }
|
||||
}
|
||||
|
||||
EventBus(const EventBus &) = delete;
|
||||
EventBus &operator=(const EventBus &) = delete;
|
||||
|
||||
template<typename Event>
|
||||
void Post(const Event &event)
|
||||
{
|
||||
EventRegistry<Event>::Instance().Post(this, event);
|
||||
}
|
||||
|
||||
// On<Event1> ([](const Event1 &){})
|
||||
template<typename Event, typename C>
|
||||
typename std::enable_if<std::is_base_of<EventSubscriber, C>::value>::type
|
||||
Subscribe(C *instance, void (C::*method)(Event))
|
||||
{
|
||||
{
|
||||
sled::MutexLock lock(&mutex_);
|
||||
cleanup_handlers_[std::type_index(typeid(Event))] = EventRegistry<Event>::GetCleanupHandler();
|
||||
}
|
||||
|
||||
EventRegistry<Event>::Instance().Subscribe(this, instance, method);
|
||||
}
|
||||
|
||||
template<typename Event, typename C>
|
||||
typename std::enable_if<std::is_base_of<EventSubscriber, C>::value>::type Unsubscribe(C *instance)
|
||||
{
|
||||
EventRegistry<Event>::Instance().Unsubscribe(this, instance);
|
||||
{
|
||||
sled::MutexLock lock(&mutex_);
|
||||
if (EventRegistry<Event>::Instance().IsEmpty(this)) {
|
||||
auto iter = cleanup_handlers_.find(std::type_index(typeid(Event)));
|
||||
if (iter != cleanup_handlers_.end()) {
|
||||
iter->second(this);
|
||||
cleanup_handlers_.erase(iter);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
sled::Mutex mutex_;
|
||||
std::unordered_map<std::type_index, std::function<void(EventBus *)>> cleanup_handlers_ GUARDED_BY(mutex_);
|
||||
};
|
||||
}// namespace sled
|
||||
#endif// SLED_EVENT_BUS_EVENT_BUS_H
|
65
src/sled/event_bus/event_bus_test.cc
Normal file
65
src/sled/event_bus/event_bus_test.cc
Normal file
@ -0,0 +1,65 @@
|
||||
#include <sled/event_bus/event_bus.h>
|
||||
#include <sled/system/thread_pool.h>
|
||||
|
||||
struct Event1 {
|
||||
int a;
|
||||
};
|
||||
|
||||
struct Event2 {
|
||||
std::string str;
|
||||
};
|
||||
|
||||
struct Subscriber : public sled::EventSubscriber {
|
||||
void OnEvent1(Event1 event) { a += event.a; }
|
||||
|
||||
void OnEvent2(Event2 event) { str += event.str; }
|
||||
|
||||
int a = 0;
|
||||
std::string str = "";
|
||||
};
|
||||
|
||||
TEST_SUITE("EventBus")
|
||||
{
|
||||
TEST_CASE("single thread")
|
||||
{
|
||||
sled::EventBus bus;
|
||||
bus.Post(Event1{1});
|
||||
bus.Post(Event2{"1"});
|
||||
|
||||
Subscriber subscriber;
|
||||
bus.Subscribe<Event1>(&subscriber, &Subscriber::OnEvent1);
|
||||
bus.Subscribe<Event2>(&subscriber, &Subscriber::OnEvent2);
|
||||
|
||||
bus.Post(Event1{1});
|
||||
bus.Post(Event2{"1"});
|
||||
|
||||
CHECK_EQ(subscriber.a, 1);
|
||||
CHECK_EQ(subscriber.str, "1");
|
||||
|
||||
bus.Post(Event1{1});
|
||||
bus.Post(Event2{"1"});
|
||||
|
||||
CHECK_EQ(subscriber.a, 2);
|
||||
CHECK_EQ(subscriber.str, "11");
|
||||
}
|
||||
|
||||
TEST_CASE("multi thread")
|
||||
{
|
||||
auto thread = sled::Thread::Create();
|
||||
thread->Start();
|
||||
|
||||
sled::EventBus bus;
|
||||
Subscriber subscriber;
|
||||
|
||||
bus.Subscribe(&subscriber, &Subscriber::OnEvent1);
|
||||
bus.Subscribe(&subscriber, &Subscriber::OnEvent2);
|
||||
|
||||
thread->BlockingCall([&] {
|
||||
bus.Post(Event1{1});
|
||||
bus.Post(Event2{"1"});
|
||||
});
|
||||
|
||||
CHECK_EQ(subscriber.a, 1);
|
||||
CHECK_EQ(subscriber.str, "1");
|
||||
}
|
||||
}
|
@ -4,12 +4,19 @@ namespace sigslot {
|
||||
|
||||
#ifdef _SIGSLOT_HAS_POSIX_THREADS
|
||||
|
||||
pthread_mutex_t *
|
||||
sled::Mutex *
|
||||
multi_threaded_global::get_mutex()
|
||||
{
|
||||
static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
static sled::Mutex g_mutex;
|
||||
return &g_mutex;
|
||||
}
|
||||
|
||||
// pthread_mutex_t *
|
||||
// multi_threaded_global::get_mutex()
|
||||
// {
|
||||
// static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
// return &g_mutex;
|
||||
// }
|
||||
|
||||
#endif// _SIGSLOT_HAS_POSIX_THREADS
|
||||
}// namespace sigslot
|
||||
|
@ -94,10 +94,11 @@
|
||||
// If signalx is single threaded the user must ensure that disconnect, connect
|
||||
// or signal is not happening concurrently or data race may occur.
|
||||
|
||||
#pragma once
|
||||
#ifndef SLED_SIGSLOT_H
|
||||
#define SLED_SIGSLOT_H
|
||||
#pragma once
|
||||
|
||||
#include "sled/synchronization/mutex.h"
|
||||
#include <cstring>
|
||||
#include <list>
|
||||
#include <set>
|
||||
@ -105,9 +106,7 @@
|
||||
// On our copy of sigslot.h, we set single threading as default.
|
||||
#define SIGSLOT_DEFAULT_MT_POLICY single_threaded
|
||||
|
||||
#if defined(SIGSLOT_PURE_ISO) \
|
||||
|| (!defined(WEBRTC_WIN) && !defined(__GNUG__) \
|
||||
&& !defined(SIGSLOT_USE_POSIX_THREADS))
|
||||
#if defined(SIGSLOT_PURE_ISO) || (!defined(WEBRTC_WIN) && !defined(__GNUG__) && !defined(SIGSLOT_USE_POSIX_THREADS))
|
||||
#define _SIGSLOT_SINGLE_THREADED
|
||||
#elif defined(WEBRTC_WIN)
|
||||
#define _SIGSLOT_HAS_WIN32_THREADS
|
||||
@ -167,10 +166,7 @@ class multi_threaded_local {
|
||||
public:
|
||||
multi_threaded_local() { InitializeCriticalSection(&m_critsec); }
|
||||
|
||||
multi_threaded_local(const multi_threaded_local &)
|
||||
{
|
||||
InitializeCriticalSection(&m_critsec);
|
||||
}
|
||||
multi_threaded_local(const multi_threaded_local &) { InitializeCriticalSection(&m_critsec); }
|
||||
|
||||
~multi_threaded_local() { DeleteCriticalSection(&m_critsec); }
|
||||
|
||||
@ -187,31 +183,49 @@ private:
|
||||
// The multi threading policies only get compiled in if they are enabled.
|
||||
class multi_threaded_global {
|
||||
public:
|
||||
void lock() { pthread_mutex_lock(get_mutex()); }
|
||||
void lock()
|
||||
{
|
||||
get_mutex()->Lock();
|
||||
// pthread_mutex_lock(get_mutex());
|
||||
}
|
||||
|
||||
void unlock() { pthread_mutex_unlock(get_mutex()); }
|
||||
void unlock()
|
||||
{
|
||||
get_mutex()->Unlock();
|
||||
// pthread_mutex_unlock(get_mutex());
|
||||
}
|
||||
|
||||
private:
|
||||
static pthread_mutex_t *get_mutex();
|
||||
static sled::Mutex *get_mutex();
|
||||
// static pthread_mutex_t *get_mutex();
|
||||
};
|
||||
|
||||
class multi_threaded_local {
|
||||
public:
|
||||
multi_threaded_local() { pthread_mutex_init(&m_mutex, nullptr); }
|
||||
// multi_threaded_local() { pthread_mutex_init(&m_mutex, nullptr); }
|
||||
//
|
||||
// multi_threaded_local(const multi_threaded_local &)
|
||||
// {
|
||||
// pthread_mutex_init(&m_mutex, nullptr);
|
||||
// }
|
||||
//
|
||||
// ~multi_threaded_local() { pthread_mutex_destroy(&m_mutex); }
|
||||
|
||||
multi_threaded_local(const multi_threaded_local &)
|
||||
void lock()
|
||||
{
|
||||
pthread_mutex_init(&m_mutex, nullptr);
|
||||
mutex_.Lock();
|
||||
// pthread_mutex_lock(&m_mutex);
|
||||
}
|
||||
|
||||
~multi_threaded_local() { pthread_mutex_destroy(&m_mutex); }
|
||||
|
||||
void lock() { pthread_mutex_lock(&m_mutex); }
|
||||
|
||||
void unlock() { pthread_mutex_unlock(&m_mutex); }
|
||||
void unlock()
|
||||
{
|
||||
mutex_.Unlock();
|
||||
// pthread_mutex_unlock(&m_mutex);
|
||||
}
|
||||
|
||||
private:
|
||||
pthread_mutex_t m_mutex;
|
||||
sled::Mutex mutex_;
|
||||
// pthread_mutex_t m_mutex;
|
||||
};
|
||||
#endif// _SIGSLOT_HAS_POSIX_THREADS
|
||||
|
||||
@ -229,10 +243,8 @@ class _signal_base_interface;
|
||||
|
||||
class has_slots_interface {
|
||||
private:
|
||||
typedef void (*signal_connect_t)(has_slots_interface *self,
|
||||
_signal_base_interface *sender);
|
||||
typedef void (*signal_disconnect_t)(has_slots_interface *self,
|
||||
_signal_base_interface *sender);
|
||||
typedef void (*signal_connect_t)(has_slots_interface *self, _signal_base_interface *sender);
|
||||
typedef void (*signal_disconnect_t)(has_slots_interface *self, _signal_base_interface *sender);
|
||||
typedef void (*disconnect_all_t)(has_slots_interface *self);
|
||||
|
||||
const signal_connect_t m_signal_connect;
|
||||
@ -240,9 +252,7 @@ private:
|
||||
const disconnect_all_t m_disconnect_all;
|
||||
|
||||
protected:
|
||||
has_slots_interface(signal_connect_t conn,
|
||||
signal_disconnect_t disc,
|
||||
disconnect_all_t disc_all)
|
||||
has_slots_interface(signal_connect_t conn, signal_disconnect_t disc, disconnect_all_t disc_all)
|
||||
: m_signal_connect(conn),
|
||||
m_signal_disconnect(disc),
|
||||
m_disconnect_all(disc_all)
|
||||
@ -253,23 +263,16 @@ protected:
|
||||
virtual ~has_slots_interface() {}
|
||||
|
||||
public:
|
||||
void signal_connect(_signal_base_interface *sender)
|
||||
{
|
||||
m_signal_connect(this, sender);
|
||||
}
|
||||
void signal_connect(_signal_base_interface *sender) { m_signal_connect(this, sender); }
|
||||
|
||||
void signal_disconnect(_signal_base_interface *sender)
|
||||
{
|
||||
m_signal_disconnect(this, sender);
|
||||
}
|
||||
void signal_disconnect(_signal_base_interface *sender) { m_signal_disconnect(this, sender); }
|
||||
|
||||
void disconnect_all() { m_disconnect_all(this); }
|
||||
};
|
||||
|
||||
class _signal_base_interface {
|
||||
private:
|
||||
typedef void (*slot_disconnect_t)(_signal_base_interface *self,
|
||||
has_slots_interface *pslot);
|
||||
typedef void (*slot_disconnect_t)(_signal_base_interface *self, has_slots_interface *pslot);
|
||||
typedef void (*slot_duplicate_t)(_signal_base_interface *self,
|
||||
const has_slots_interface *poldslot,
|
||||
has_slots_interface *pnewslot);
|
||||
@ -286,13 +289,9 @@ protected:
|
||||
~_signal_base_interface() {}
|
||||
|
||||
public:
|
||||
void slot_disconnect(has_slots_interface *pslot)
|
||||
{
|
||||
m_slot_disconnect(this, pslot);
|
||||
}
|
||||
void slot_disconnect(has_slots_interface *pslot) { m_slot_disconnect(this, pslot); }
|
||||
|
||||
void slot_duplicate(const has_slots_interface *poldslot,
|
||||
has_slots_interface *pnewslot)
|
||||
void slot_duplicate(const has_slots_interface *poldslot, has_slots_interface *pnewslot)
|
||||
{
|
||||
m_slot_duplicate(this, poldslot, pnewslot);
|
||||
}
|
||||
@ -323,15 +322,14 @@ public:
|
||||
_opaque_connection(DestT *pd, void (DestT::*pm)(Args...)) : pdest(pd)
|
||||
{
|
||||
typedef void (DestT::*pm_t)(Args...);
|
||||
static_assert(sizeof(pm_t) <= sizeof(pmethod),
|
||||
"Size of slot function pointer too large.");
|
||||
static_assert(sizeof(pm_t) <= sizeof(pmethod), "Size of slot function pointer too large.");
|
||||
|
||||
std::memcpy(pmethod, &pm, sizeof(pm_t));
|
||||
|
||||
typedef void (*em_t)(const _opaque_connection *self, Args...);
|
||||
union_caster<em_t, emit_t> caster2;
|
||||
caster2.from = &_opaque_connection::emitter<DestT, Args...>;
|
||||
pemit = caster2.to;
|
||||
pemit = caster2.to;
|
||||
}
|
||||
|
||||
has_slots_interface *getdest() const { return pdest; }
|
||||
@ -339,7 +337,7 @@ public:
|
||||
_opaque_connection duplicate(has_slots_interface *newtarget) const
|
||||
{
|
||||
_opaque_connection res = *this;
|
||||
res.pdest = newtarget;
|
||||
res.pdest = newtarget;
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -360,8 +358,7 @@ private:
|
||||
{
|
||||
typedef void (DestT::*pm_t)(Args...);
|
||||
pm_t pm;
|
||||
static_assert(sizeof(pm_t) <= sizeof(pmethod),
|
||||
"Size of slot function pointer too large.");
|
||||
static_assert(sizeof(pm_t) <= sizeof(pmethod), "Size of slot function pointer too large.");
|
||||
std::memcpy(&pm, self->pmethod, sizeof(pm_t));
|
||||
(static_cast<DestT *>(self->pdest)->*(pm))(args...);
|
||||
}
|
||||
@ -373,8 +370,7 @@ protected:
|
||||
typedef std::list<_opaque_connection> connections_list;
|
||||
|
||||
_signal_base()
|
||||
: _signal_base_interface(&_signal_base::do_slot_disconnect,
|
||||
&_signal_base::do_slot_duplicate),
|
||||
: _signal_base_interface(&_signal_base::do_slot_disconnect, &_signal_base::do_slot_duplicate),
|
||||
m_current_iterator(m_connected_slots.end())
|
||||
{}
|
||||
|
||||
@ -385,8 +381,7 @@ private:
|
||||
|
||||
public:
|
||||
_signal_base(const _signal_base &o)
|
||||
: _signal_base_interface(&_signal_base::do_slot_disconnect,
|
||||
&_signal_base::do_slot_duplicate),
|
||||
: _signal_base_interface(&_signal_base::do_slot_disconnect, &_signal_base::do_slot_duplicate),
|
||||
m_current_iterator(m_connected_slots.end())
|
||||
{
|
||||
lock_block<mt_policy> lock(this);
|
||||
@ -409,8 +404,7 @@ public:
|
||||
while (!m_connected_slots.empty()) {
|
||||
has_slots_interface *pdest = m_connected_slots.front().getdest();
|
||||
m_connected_slots.pop_front();
|
||||
pdest->signal_disconnect(
|
||||
static_cast<_signal_base_interface *>(this));
|
||||
pdest->signal_disconnect(static_cast<_signal_base_interface *>(this));
|
||||
}
|
||||
// If disconnect_all is called while the signal is firing, advance the
|
||||
// current slot iterator to the end to avoid an invalidated iterator from
|
||||
@ -422,7 +416,7 @@ public:
|
||||
bool connected(has_slots_interface *pclass)
|
||||
{
|
||||
lock_block<mt_policy> lock(this);
|
||||
connections_list::const_iterator it = m_connected_slots.begin();
|
||||
connections_list::const_iterator it = m_connected_slots.begin();
|
||||
connections_list::const_iterator itEnd = m_connected_slots.end();
|
||||
while (it != itEnd) {
|
||||
if (it->getdest() == pclass) return true;
|
||||
@ -435,7 +429,7 @@ public:
|
||||
void disconnect(has_slots_interface *pclass)
|
||||
{
|
||||
lock_block<mt_policy> lock(this);
|
||||
connections_list::iterator it = m_connected_slots.begin();
|
||||
connections_list::iterator it = m_connected_slots.begin();
|
||||
connections_list::iterator itEnd = m_connected_slots.end();
|
||||
|
||||
while (it != itEnd) {
|
||||
@ -447,8 +441,7 @@ public:
|
||||
} else {
|
||||
m_connected_slots.erase(it);
|
||||
}
|
||||
pclass->signal_disconnect(
|
||||
static_cast<_signal_base_interface *>(this));
|
||||
pclass->signal_disconnect(static_cast<_signal_base_interface *>(this));
|
||||
return;
|
||||
}
|
||||
++it;
|
||||
@ -456,12 +449,11 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
static void do_slot_disconnect(_signal_base_interface *p,
|
||||
has_slots_interface *pslot)
|
||||
static void do_slot_disconnect(_signal_base_interface *p, has_slots_interface *pslot)
|
||||
{
|
||||
_signal_base *const self = static_cast<_signal_base *>(p);
|
||||
lock_block<mt_policy> lock(self);
|
||||
connections_list::iterator it = self->m_connected_slots.begin();
|
||||
connections_list::iterator it = self->m_connected_slots.begin();
|
||||
connections_list::iterator itEnd = self->m_connected_slots.end();
|
||||
|
||||
while (it != itEnd) {
|
||||
@ -472,8 +464,7 @@ private:
|
||||
// If we're currently using this iterator because the signal is firing,
|
||||
// advance it to avoid it being invalidated.
|
||||
if (self->m_current_iterator == it) {
|
||||
self->m_current_iterator =
|
||||
self->m_connected_slots.erase(it);
|
||||
self->m_current_iterator = self->m_connected_slots.erase(it);
|
||||
} else {
|
||||
self->m_connected_slots.erase(it);
|
||||
}
|
||||
@ -483,19 +474,16 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
static void do_slot_duplicate(_signal_base_interface *p,
|
||||
const has_slots_interface *oldtarget,
|
||||
has_slots_interface *newtarget)
|
||||
static void
|
||||
do_slot_duplicate(_signal_base_interface *p, const has_slots_interface *oldtarget, has_slots_interface *newtarget)
|
||||
{
|
||||
_signal_base *const self = static_cast<_signal_base *>(p);
|
||||
lock_block<mt_policy> lock(self);
|
||||
connections_list::iterator it = self->m_connected_slots.begin();
|
||||
connections_list::iterator it = self->m_connected_slots.begin();
|
||||
connections_list::iterator itEnd = self->m_connected_slots.end();
|
||||
|
||||
while (it != itEnd) {
|
||||
if (it->getdest() == oldtarget) {
|
||||
self->m_connected_slots.push_back(it->duplicate(newtarget));
|
||||
}
|
||||
if (it->getdest() == oldtarget) { self->m_connected_slots.push_back(it->duplicate(newtarget)); }
|
||||
|
||||
++it;
|
||||
}
|
||||
@ -540,16 +528,14 @@ public:
|
||||
private:
|
||||
has_slots &operator=(has_slots const &);
|
||||
|
||||
static void do_signal_connect(has_slots_interface *p,
|
||||
_signal_base_interface *sender)
|
||||
static void do_signal_connect(has_slots_interface *p, _signal_base_interface *sender)
|
||||
{
|
||||
has_slots *const self = static_cast<has_slots *>(p);
|
||||
lock_block<mt_policy> lock(self);
|
||||
self->m_senders.insert(sender);
|
||||
}
|
||||
|
||||
static void do_signal_disconnect(has_slots_interface *p,
|
||||
_signal_base_interface *sender)
|
||||
static void do_signal_disconnect(has_slots_interface *p, _signal_base_interface *sender)
|
||||
{
|
||||
has_slots *const self = static_cast<has_slots *>(p);
|
||||
lock_block<mt_policy> lock(self);
|
||||
@ -563,7 +549,7 @@ private:
|
||||
while (!self->m_senders.empty()) {
|
||||
std::set<_signal_base_interface *> senders;
|
||||
senders.swap(self->m_senders);
|
||||
const_iterator it = senders.begin();
|
||||
const_iterator it = senders.begin();
|
||||
const_iterator itEnd = senders.end();
|
||||
|
||||
while (it != itEnd) {
|
||||
@ -627,22 +613,13 @@ using signal0 = signal_with_thread_policy<mt_policy>;
|
||||
template<typename A1, typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
|
||||
using signal1 = signal_with_thread_policy<mt_policy, A1>;
|
||||
|
||||
template<typename A1,
|
||||
typename A2,
|
||||
typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
|
||||
template<typename A1, typename A2, typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
|
||||
using signal2 = signal_with_thread_policy<mt_policy, A1, A2>;
|
||||
|
||||
template<typename A1,
|
||||
typename A2,
|
||||
typename A3,
|
||||
typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
|
||||
template<typename A1, typename A2, typename A3, typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
|
||||
using signal3 = signal_with_thread_policy<mt_policy, A1, A2, A3>;
|
||||
|
||||
template<typename A1,
|
||||
typename A2,
|
||||
typename A3,
|
||||
typename A4,
|
||||
typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
|
||||
template<typename A1, typename A2, typename A3, typename A4, typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
|
||||
using signal4 = signal_with_thread_policy<mt_policy, A1, A2, A3, A4>;
|
||||
|
||||
template<typename A1,
|
||||
@ -670,8 +647,7 @@ template<typename A1,
|
||||
typename A6,
|
||||
typename A7,
|
||||
typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
|
||||
using signal7 =
|
||||
signal_with_thread_policy<mt_policy, A1, A2, A3, A4, A5, A6, A7>;
|
||||
using signal7 = signal_with_thread_policy<mt_policy, A1, A2, A3, A4, A5, A6, A7>;
|
||||
|
||||
template<typename A1,
|
||||
typename A2,
|
||||
@ -682,9 +658,8 @@ template<typename A1,
|
||||
typename A7,
|
||||
typename A8,
|
||||
typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
|
||||
using signal8 =
|
||||
signal_with_thread_policy<mt_policy, A1, A2, A3, A4, A5, A6, A7, A8>;
|
||||
using signal8 = signal_with_thread_policy<mt_policy, A1, A2, A3, A4, A5, A6, A7, A8>;
|
||||
|
||||
}// namespace sigslot
|
||||
|
||||
#endif // SLED_SIGSLOT_H
|
||||
#endif// SLED_SIGSLOT_H
|
||||
|
@ -9,6 +9,9 @@ namespace async {}
|
||||
#include "inja.hpp"
|
||||
#include "rx.h"
|
||||
|
||||
// event_bus
|
||||
#include "sled/event_bus/event_bus.h"
|
||||
|
||||
// filesystem
|
||||
#include "sled/filesystem/path.h"
|
||||
#include "sled/filesystem/temporary_file.h"
|
||||
|
@ -146,46 +146,90 @@ private:
|
||||
marl::ConditionVariable cv_;
|
||||
};
|
||||
|
||||
// class ConditionVariable final {
|
||||
// public:
|
||||
// static constexpr TimeDelta kForever = TimeDelta::PlusInfinity();
|
||||
// ConditionVariable() = default;
|
||||
// ConditionVariable(const ConditionVariable &) = delete;
|
||||
// ConditionVariable &operator=(const ConditionVariable &) = delete;
|
||||
//
|
||||
// template<typename Predicate>
|
||||
// inline bool Wait(LockGuard<Mutex> &guard, Predicate pred)
|
||||
// {
|
||||
// std::unique_lock<std::mutex> lock(guard.mutex_->impl_, std::adopt_lock);
|
||||
// cv_.wait(lock, pred);
|
||||
// return true;
|
||||
// }
|
||||
//
|
||||
// template<typename Predicate>
|
||||
// inline bool
|
||||
// WaitFor(LockGuard<Mutex> &guard, TimeDelta timeout, Predicate pred)
|
||||
// {
|
||||
// std::unique_lock<std::mutex> lock(guard.mutex_->impl_, std::adopt_lock);
|
||||
// if (timeout == kForever) {
|
||||
// cv_.wait(lock, pred);
|
||||
// return true;
|
||||
// } else {
|
||||
// return cv_.wait_for(lock, std::chrono::milliseconds(timeout.ms()),
|
||||
// pred);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // template<typename Predicate>
|
||||
// // bool WaitUntil(Mutex *mutex, TimeDelta timeout, Predicate pred)
|
||||
// // {}
|
||||
//
|
||||
// inline void NotifyOne() { cv_.notify_one(); }
|
||||
//
|
||||
// inline void NotifyAll() { cv_.notify_all(); }
|
||||
//
|
||||
// private:
|
||||
// std::condition_variable cv_;
|
||||
// };
|
||||
class SCOPED_CAPABILITY SharedMutex final {
|
||||
public:
|
||||
enum class Mode {
|
||||
kReaderPriority,
|
||||
kWriterPriority,
|
||||
};
|
||||
|
||||
inline SharedMutex(Mode mode = SharedMutex::Mode::kWriterPriority) : mode_(mode) {}
|
||||
|
||||
inline void Lock() SLED_EXCLUSIVE_LOCK_FUNCTION()
|
||||
{
|
||||
wait_w_count_.fetch_add(1);
|
||||
|
||||
sled::MutexLock lock(&mutex_);
|
||||
if (Mode::kReaderPriority == mode_) {
|
||||
// 读取优先,必须在没有任何读取的消费者的情况下才能持有锁
|
||||
cv_.Wait(lock, [this] { return r_count_ == 0 && w_count_ == 0 && wait_r_count_.load() == 0; });
|
||||
w_count_++;
|
||||
} else {
|
||||
// 写入优先,只要没有持有读锁的消费者,就可以加锁
|
||||
cv_.Wait(lock, [this] { return r_count_ == 0 && w_count_ == 0; });
|
||||
w_count_++;
|
||||
cv_.Wait(lock, [this] { return r_count_ == 0; });
|
||||
}
|
||||
wait_w_count_.fetch_sub(1);
|
||||
}
|
||||
|
||||
inline void Unlock() SLED_UNLOCK_FUNCTION()
|
||||
{
|
||||
sled::MutexLock lock(&mutex_);
|
||||
w_count_--;
|
||||
if (w_count_ == 0) { cv_.NotifyAll(); }
|
||||
}
|
||||
|
||||
inline void LockShared() SLED_SHARED_LOCK_FUNCTION()
|
||||
{
|
||||
wait_r_count_.fetch_add(1);
|
||||
sled::MutexLock lock(&mutex_);
|
||||
if (Mode::kReaderPriority == mode_) {
|
||||
cv_.Wait(lock, [this] { return w_count_ == 0; });
|
||||
r_count_++;
|
||||
} else {
|
||||
cv_.Wait(lock, [this] { return w_count_ == 0 && wait_w_count_.load() == 0; });
|
||||
r_count_++;
|
||||
}
|
||||
wait_r_count_.fetch_sub(1);
|
||||
}
|
||||
|
||||
inline void UnlockShared() SLED_UNLOCK_FUNCTION()
|
||||
{
|
||||
sled::MutexLock lock(&mutex_);
|
||||
r_count_--;
|
||||
if (r_count_ == 0) { cv_.NotifyAll(); }
|
||||
}
|
||||
|
||||
private:
|
||||
const Mode mode_;
|
||||
sled::Mutex mutex_;
|
||||
sled::ConditionVariable cv_;
|
||||
int r_count_{0};
|
||||
int w_count_{0};
|
||||
std::atomic<int> wait_r_count_{0};
|
||||
std::atomic<int> wait_w_count_{0};
|
||||
};
|
||||
|
||||
class SharedMutexReadLock final {
|
||||
public:
|
||||
explicit SharedMutexReadLock(SharedMutex *mutex) : mutex_(mutex) { mutex_->LockShared(); }
|
||||
|
||||
~SharedMutexReadLock() { mutex_->UnlockShared(); }
|
||||
|
||||
private:
|
||||
SharedMutex *mutex_;
|
||||
};
|
||||
|
||||
class SharedMutexWriteLock final {
|
||||
public:
|
||||
explicit SharedMutexWriteLock(SharedMutex *mutex) : mutex_(mutex) { mutex_->Lock(); }
|
||||
|
||||
~SharedMutexWriteLock() { mutex_->Unlock(); }
|
||||
|
||||
private:
|
||||
SharedMutex *mutex_;
|
||||
};
|
||||
|
||||
}// namespace sled
|
||||
|
||||
|
@ -41,7 +41,7 @@ TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, Timeout
|
||||
precision_,
|
||||
SafeTask(safety_flag_,
|
||||
[timeout_id, this]() {
|
||||
LOGV("timer", "Timeout expired: {}", timeout_id);
|
||||
LOGV("timer", "Timeout expired id={}", timeout_id);
|
||||
SLED_DCHECK_RUN_ON(&parent_.thread_checker_);
|
||||
SLED_DCHECK(posted_task_expiration_ != std::numeric_limits<TimeMs>::max(), "");
|
||||
posted_task_expiration_ = std::numeric_limits<TimeMs>::max();
|
||||
|
Loading…
x
Reference in New Issue
Block a user