feat add event_bus #1
@ -111,6 +111,7 @@ if(SLED_BUILD_BENCHMARK)
|
|||||||
|
|
||||||
add_executable(
|
add_executable(
|
||||||
sled_benchmark
|
sled_benchmark
|
||||||
|
src/sled/event_bus/event_bus_bench.cc
|
||||||
src/sled/random_bench.cc
|
src/sled/random_bench.cc
|
||||||
src/sled/strings/base64_bench.cc
|
src/sled/strings/base64_bench.cc
|
||||||
# src/sled/system/fiber/fiber_bench.cc
|
# src/sled/system/fiber/fiber_bench.cc
|
||||||
|
@ -8,14 +8,14 @@
|
|||||||
namespace sled {
|
namespace sled {
|
||||||
|
|
||||||
class EventBus;
|
class EventBus;
|
||||||
class EventSubscriber;
|
class Subscriber;
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
template<typename Event>
|
template<typename Event>
|
||||||
class EventRegistry {
|
class EventRegistry {
|
||||||
public:
|
public:
|
||||||
using Dispatcher = sigslot::signal1<Event, sigslot::single_threaded>;
|
using Dispatcher = sigslot::signal1<Event>;
|
||||||
using SubscriberTable = std::unordered_map<EventBus *, Dispatcher>;
|
using SubscriberTable = std::unordered_map<EventBus *, Dispatcher>;
|
||||||
|
|
||||||
static EventRegistry &Instance()
|
static EventRegistry &Instance()
|
||||||
@ -60,6 +60,7 @@ public:
|
|||||||
if (iter == signals_.end()) { return; }
|
if (iter == signals_.end()) { return; }
|
||||||
auto &dispatcher = iter->second;
|
auto &dispatcher = iter->second;
|
||||||
dispatcher.disconnect(instance);
|
dispatcher.disconnect(instance);
|
||||||
|
if (dispatcher.is_empty()) { signals_.erase(iter); }
|
||||||
}
|
}
|
||||||
|
|
||||||
bool IsEmpty(EventBus *bus) const
|
bool IsEmpty(EventBus *bus) const
|
||||||
@ -94,13 +95,13 @@ private:
|
|||||||
|
|
||||||
}// namespace
|
}// namespace
|
||||||
|
|
||||||
class EventSubscriber : public sigslot::has_slots<> {
|
|
||||||
public:
|
|
||||||
virtual ~EventSubscriber() {}
|
|
||||||
};
|
|
||||||
|
|
||||||
class EventBus {
|
class EventBus {
|
||||||
public:
|
public:
|
||||||
|
class Subscriber : public sigslot::has_slots<> {
|
||||||
|
public:
|
||||||
|
virtual ~Subscriber() = default;
|
||||||
|
};
|
||||||
|
|
||||||
EventBus() = default;
|
EventBus() = default;
|
||||||
|
|
||||||
~EventBus()
|
~EventBus()
|
||||||
@ -119,7 +120,7 @@ public:
|
|||||||
|
|
||||||
// On<Event1> ([](const Event1 &){})
|
// On<Event1> ([](const Event1 &){})
|
||||||
template<typename Event, typename C>
|
template<typename Event, typename C>
|
||||||
typename std::enable_if<std::is_base_of<EventSubscriber, C>::value>::type
|
typename std::enable_if<std::is_base_of<Subscriber, C>::value>::type
|
||||||
Subscribe(C *instance, void (C::*method)(Event))
|
Subscribe(C *instance, void (C::*method)(Event))
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
@ -131,7 +132,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
template<typename Event, typename C>
|
template<typename Event, typename C>
|
||||||
typename std::enable_if<std::is_base_of<EventSubscriber, C>::value>::type Unsubscribe(C *instance)
|
typename std::enable_if<std::is_base_of<Subscriber, C>::value>::type Unsubscribe(C *instance)
|
||||||
{
|
{
|
||||||
EventRegistry<Event>::Instance().Unsubscribe(this, instance);
|
EventRegistry<Event>::Instance().Unsubscribe(this, instance);
|
||||||
{
|
{
|
||||||
|
78
src/sled/event_bus/event_bus_bench.cc
Normal file
78
src/sled/event_bus/event_bus_bench.cc
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
#include <sled/event_bus/event_bus.h>
|
||||||
|
#include <sled/log/log.h>
|
||||||
|
#include <sled/system/fiber/wait_group.h>
|
||||||
|
#include <sled/system/thread_pool.h>
|
||||||
|
|
||||||
|
struct Event {
|
||||||
|
std::shared_ptr<int> data = std::make_shared<int>(0);
|
||||||
|
};
|
||||||
|
|
||||||
|
struct AtomicEvent {
|
||||||
|
AtomicEvent(std::atomic<int> &v) : data(v) {}
|
||||||
|
|
||||||
|
std::atomic<int> &data;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Subscriber : public sled::EventBus::Subscriber {
|
||||||
|
void OnEvent(Event event) { (*event.data)++; }
|
||||||
|
|
||||||
|
void OnAtomicnEvent(AtomicEvent event) { event.data.fetch_add(1); }
|
||||||
|
};
|
||||||
|
|
||||||
|
void
|
||||||
|
BMEventBusPost_1_to_1(picobench::state &s)
|
||||||
|
{
|
||||||
|
sled::EventBus event_bus;
|
||||||
|
Subscriber subscriber;
|
||||||
|
event_bus.Subscribe(&subscriber, &Subscriber::OnEvent);
|
||||||
|
for (auto _ : s) { event_bus.Post(Event{}); }
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
BMEventBusPost_1_to_1k(picobench::state &s)
|
||||||
|
{
|
||||||
|
sled::EventBus event_bus;
|
||||||
|
std::vector<Subscriber> subscribers(1000);
|
||||||
|
for (auto &subscriber : subscribers) { event_bus.Subscribe(&subscriber, &Subscriber::OnEvent); }
|
||||||
|
|
||||||
|
for (auto _ : s) {
|
||||||
|
Event event;
|
||||||
|
event_bus.Post(event);
|
||||||
|
SLED_ASSERT(*event.data == 1000, "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
BMEventBusPost_10_to_1k(picobench::state &s)
|
||||||
|
{
|
||||||
|
constexpr int kPublishCount = 10;
|
||||||
|
constexpr int kSubscriberCount = 1000;
|
||||||
|
|
||||||
|
sled::EventBus event_bus;
|
||||||
|
std::vector<Subscriber> subscribers(kSubscriberCount);
|
||||||
|
for (auto &subscriber : subscribers) { event_bus.Subscribe(&subscriber, &Subscriber::OnAtomicnEvent); }
|
||||||
|
sled::ThreadPool pool(kPublishCount);
|
||||||
|
|
||||||
|
for (auto _ : s) {
|
||||||
|
std::atomic<int> value(0);
|
||||||
|
AtomicEvent atomic_event(value);
|
||||||
|
sled::WaitGroup wg(kPublishCount);
|
||||||
|
for (int i = 0; i < kPublishCount; i++) {
|
||||||
|
pool.PostTask([atomic_event, wg, &event_bus]() {
|
||||||
|
event_bus.Post(atomic_event);
|
||||||
|
wg.Done();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
wg.Wait();
|
||||||
|
SLED_ASSERT(value.load() == kPublishCount * kSubscriberCount,
|
||||||
|
"{} != {}",
|
||||||
|
value.load(),
|
||||||
|
kPublishCount * kSubscriberCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PICOBENCH_SUITE("EventBus");
|
||||||
|
|
||||||
|
PICOBENCH(BMEventBusPost_1_to_1);
|
||||||
|
PICOBENCH(BMEventBusPost_1_to_1k);
|
||||||
|
PICOBENCH(BMEventBusPost_10_to_1k);
|
@ -1,6 +1,23 @@
|
|||||||
#include <sled/event_bus/event_bus.h>
|
#include <sled/event_bus/event_bus.h>
|
||||||
|
#include <sled/log/log.h>
|
||||||
|
#include <sled/system/fiber/wait_group.h>
|
||||||
#include <sled/system/thread_pool.h>
|
#include <sled/system/thread_pool.h>
|
||||||
|
|
||||||
|
using namespace fakeit;
|
||||||
|
|
||||||
|
template<typename T, typename R, typename... Args>
|
||||||
|
void *
|
||||||
|
GetPtr(R (T::*p)(Args...))
|
||||||
|
{
|
||||||
|
union {
|
||||||
|
R (T::*ptr)(Args...);
|
||||||
|
void *void_ptr;
|
||||||
|
} _;
|
||||||
|
|
||||||
|
_.ptr = p;
|
||||||
|
return _.void_ptr;
|
||||||
|
}
|
||||||
|
|
||||||
struct Event1 {
|
struct Event1 {
|
||||||
int a;
|
int a;
|
||||||
};
|
};
|
||||||
@ -9,7 +26,7 @@ struct Event2 {
|
|||||||
std::string str;
|
std::string str;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Subscriber : public sled::EventSubscriber {
|
struct Subscriber : public sled::EventBus::Subscriber {
|
||||||
void OnEvent1(Event1 event) { a += event.a; }
|
void OnEvent1(Event1 event) { a += event.a; }
|
||||||
|
|
||||||
void OnEvent2(Event2 event) { str += event.str; }
|
void OnEvent2(Event2 event) { str += event.str; }
|
||||||
@ -62,4 +79,42 @@ TEST_SUITE("EventBus")
|
|||||||
CHECK_EQ(subscriber.a, 1);
|
CHECK_EQ(subscriber.a, 1);
|
||||||
CHECK_EQ(subscriber.str, "1");
|
CHECK_EQ(subscriber.str, "1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("thread_pool")
|
||||||
|
{
|
||||||
|
constexpr int kPublishCount = 10;
|
||||||
|
constexpr int kSubscriberCount = 1000;
|
||||||
|
|
||||||
|
struct AtomicEvent {
|
||||||
|
std::atomic<int> &data;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct AotmicEventSubscriber : public sled::EventBus::Subscriber {
|
||||||
|
virtual ~AotmicEventSubscriber() = default;
|
||||||
|
|
||||||
|
void OnEvent(AtomicEvent event) { event.data.fetch_add(1); }
|
||||||
|
};
|
||||||
|
|
||||||
|
std::atomic<int> value(0);
|
||||||
|
AtomicEvent atomic_event{value};
|
||||||
|
|
||||||
|
sled::WaitGroup wg(kPublishCount);
|
||||||
|
sled::ThreadPool pool(kPublishCount);
|
||||||
|
sled::EventBus bus;
|
||||||
|
std::vector<AotmicEventSubscriber> subscribers(kSubscriberCount);
|
||||||
|
|
||||||
|
for (auto &sub : subscribers) { bus.Subscribe(&sub, &AotmicEventSubscriber::OnEvent); }
|
||||||
|
std::atomic<int> invoke_count(0);
|
||||||
|
for (int i = 0; i < kPublishCount; i++) {
|
||||||
|
pool.PostTask([wg, atomic_event, &bus, &invoke_count]() {
|
||||||
|
bus.Post(atomic_event);
|
||||||
|
invoke_count.fetch_add(1);
|
||||||
|
wg.Done();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
wg.Wait();
|
||||||
|
|
||||||
|
CHECK_EQ(invoke_count.load(), kPublishCount);
|
||||||
|
CHECK_EQ(value.load(), kPublishCount * kSubscriberCount);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#include <sled/random.h>
|
#include <sled/random.h>
|
||||||
#include <sled/testing/benchmark.h>
|
#include <sled/testing/benchmark.h>
|
||||||
|
PICOBENCH_SUITE("Random");
|
||||||
|
|
||||||
PICOBENCH([](picobench::state &s) {
|
PICOBENCH([](picobench::state &s) {
|
||||||
sled::Random rand(s.user_data());
|
sled::Random rand(s.user_data());
|
||||||
|
@ -104,7 +104,7 @@
|
|||||||
#include <set>
|
#include <set>
|
||||||
|
|
||||||
// On our copy of sigslot.h, we set single threading as default.
|
// On our copy of sigslot.h, we set single threading as default.
|
||||||
#define SIGSLOT_DEFAULT_MT_POLICY single_threaded
|
#define SIGSLOT_DEFAULT_MT_POLICY multi_threaded_local
|
||||||
|
|
||||||
#if defined(SIGSLOT_PURE_ISO) || (!defined(WEBRTC_WIN) && !defined(__GNUG__) && !defined(SIGSLOT_USE_POSIX_THREADS))
|
#if defined(SIGSLOT_PURE_ISO) || (!defined(WEBRTC_WIN) && !defined(__GNUG__) && !defined(SIGSLOT_USE_POSIX_THREADS))
|
||||||
#define _SIGSLOT_SINGLE_THREADED
|
#define _SIGSLOT_SINGLE_THREADED
|
||||||
|
@ -38,6 +38,6 @@ Base64Decode(picobench::state &state)
|
|||||||
(void) sled::Base64::Decode(base64_input);
|
(void) sled::Base64::Decode(base64_input);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
PICOBENCH_SUITE("Base64");
|
||||||
PICOBENCH(Base64Decode);
|
PICOBENCH(Base64Decode);
|
||||||
PICOBENCH(Base64Encode);
|
PICOBENCH(Base64Encode);
|
||||||
|
@ -13,4 +13,5 @@ ThreadPoolBench(picobench::state &state)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// BENCHMARK(ThreadPoolBench)->RangeMultiplier(10)->Range(10, 10000);
|
// BENCHMARK(ThreadPoolBench)->RangeMultiplier(10)->Range(10, 10000);
|
||||||
|
PICOBENCH_SUITE("TheadPool");
|
||||||
PICOBENCH(ThreadPoolBench);
|
PICOBENCH(ThreadPoolBench);
|
||||||
|
@ -7,4 +7,5 @@ SystemTimeNanos(picobench::state &state)
|
|||||||
for (auto _ : state) { (void) sled::SystemTimeNanos(); }
|
for (auto _ : state) { (void) sled::SystemTimeNanos(); }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PICOBENCH_SUITE("SystemTime");
|
||||||
PICOBENCH(SystemTimeNanos);
|
PICOBENCH(SystemTimeNanos);
|
||||||
|
@ -34,4 +34,5 @@ ParseURI(picobench::state &s)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PICOBENCH_SUITE("URI");
|
||||||
PICOBENCH(ParseURI);
|
PICOBENCH(ParseURI);
|
||||||
|
Loading…
Reference in New Issue
Block a user