diff --git a/CMakeLists.txt b/CMakeLists.txt index e6358e6..0dc2421 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -111,6 +111,7 @@ if(SLED_BUILD_BENCHMARK) add_executable( sled_benchmark + src/sled/event_bus/event_bus_bench.cc src/sled/random_bench.cc src/sled/strings/base64_bench.cc # src/sled/system/fiber/fiber_bench.cc diff --git a/src/sled/event_bus/event_bus.h b/src/sled/event_bus/event_bus.h index 8554d47..27f67de 100644 --- a/src/sled/event_bus/event_bus.h +++ b/src/sled/event_bus/event_bus.h @@ -8,14 +8,14 @@ namespace sled { class EventBus; -class EventSubscriber; +class Subscriber; namespace { template class EventRegistry { public: - using Dispatcher = sigslot::signal1; + using Dispatcher = sigslot::signal1; using SubscriberTable = std::unordered_map; static EventRegistry &Instance() @@ -60,6 +60,7 @@ public: if (iter == signals_.end()) { return; } auto &dispatcher = iter->second; dispatcher.disconnect(instance); + if (dispatcher.is_empty()) { signals_.erase(iter); } } bool IsEmpty(EventBus *bus) const @@ -94,13 +95,13 @@ private: }// namespace -class EventSubscriber : public sigslot::has_slots<> { -public: - virtual ~EventSubscriber() {} -}; - class EventBus { public: + class Subscriber : public sigslot::has_slots<> { + public: + virtual ~Subscriber() = default; + }; + EventBus() = default; ~EventBus() @@ -119,7 +120,7 @@ public: // On ([](const Event1 &){}) template - typename std::enable_if::value>::type + typename std::enable_if::value>::type Subscribe(C *instance, void (C::*method)(Event)) { { @@ -131,7 +132,7 @@ public: } template - typename std::enable_if::value>::type Unsubscribe(C *instance) + typename std::enable_if::value>::type Unsubscribe(C *instance) { EventRegistry::Instance().Unsubscribe(this, instance); { diff --git a/src/sled/event_bus/event_bus_bench.cc b/src/sled/event_bus/event_bus_bench.cc new file mode 100644 index 0000000..67662fa --- /dev/null +++ b/src/sled/event_bus/event_bus_bench.cc @@ -0,0 +1,78 @@ +#include +#include +#include +#include + +struct Event { + std::shared_ptr data = std::make_shared(0); +}; + +struct AtomicEvent { + AtomicEvent(std::atomic &v) : data(v) {} + + std::atomic &data; +}; + +struct Subscriber : public sled::EventBus::Subscriber { + void OnEvent(Event event) { (*event.data)++; } + + void OnAtomicnEvent(AtomicEvent event) { event.data.fetch_add(1); } +}; + +void +BMEventBusPost_1_to_1(picobench::state &s) +{ + sled::EventBus event_bus; + Subscriber subscriber; + event_bus.Subscribe(&subscriber, &Subscriber::OnEvent); + for (auto _ : s) { event_bus.Post(Event{}); } +} + +void +BMEventBusPost_1_to_1k(picobench::state &s) +{ + sled::EventBus event_bus; + std::vector subscribers(1000); + for (auto &subscriber : subscribers) { event_bus.Subscribe(&subscriber, &Subscriber::OnEvent); } + + for (auto _ : s) { + Event event; + event_bus.Post(event); + SLED_ASSERT(*event.data == 1000, ""); + } +} + +void +BMEventBusPost_10_to_1k(picobench::state &s) +{ + constexpr int kPublishCount = 10; + constexpr int kSubscriberCount = 1000; + + sled::EventBus event_bus; + std::vector subscribers(kSubscriberCount); + for (auto &subscriber : subscribers) { event_bus.Subscribe(&subscriber, &Subscriber::OnAtomicnEvent); } + sled::ThreadPool pool(kPublishCount); + + for (auto _ : s) { + std::atomic value(0); + AtomicEvent atomic_event(value); + sled::WaitGroup wg(kPublishCount); + for (int i = 0; i < kPublishCount; i++) { + pool.PostTask([atomic_event, wg, &event_bus]() { + event_bus.Post(atomic_event); + wg.Done(); + }); + } + wg.Wait(); + SLED_ASSERT(value.load() == kPublishCount * kSubscriberCount, + "{} != {}", + value.load(), + kPublishCount * kSubscriberCount); + } +} + +PICOBENCH_SUITE("EventBus"); + +PICOBENCH(BMEventBusPost_1_to_1); +PICOBENCH(BMEventBusPost_1_to_1k); +PICOBENCH(BMEventBusPost_10_to_1k); diff --git a/src/sled/event_bus/event_bus_test.cc b/src/sled/event_bus/event_bus_test.cc index a1274c7..7e7ca35 100644 --- a/src/sled/event_bus/event_bus_test.cc +++ b/src/sled/event_bus/event_bus_test.cc @@ -1,6 +1,23 @@ #include +#include +#include #include +using namespace fakeit; + +template +void * +GetPtr(R (T::*p)(Args...)) +{ + union { + R (T::*ptr)(Args...); + void *void_ptr; + } _; + + _.ptr = p; + return _.void_ptr; +} + struct Event1 { int a; }; @@ -9,7 +26,7 @@ struct Event2 { std::string str; }; -struct Subscriber : public sled::EventSubscriber { +struct Subscriber : public sled::EventBus::Subscriber { void OnEvent1(Event1 event) { a += event.a; } void OnEvent2(Event2 event) { str += event.str; } @@ -62,4 +79,42 @@ TEST_SUITE("EventBus") CHECK_EQ(subscriber.a, 1); CHECK_EQ(subscriber.str, "1"); } + + TEST_CASE("thread_pool") + { + constexpr int kPublishCount = 10; + constexpr int kSubscriberCount = 1000; + + struct AtomicEvent { + std::atomic &data; + }; + + struct AotmicEventSubscriber : public sled::EventBus::Subscriber { + virtual ~AotmicEventSubscriber() = default; + + void OnEvent(AtomicEvent event) { event.data.fetch_add(1); } + }; + + std::atomic value(0); + AtomicEvent atomic_event{value}; + + sled::WaitGroup wg(kPublishCount); + sled::ThreadPool pool(kPublishCount); + sled::EventBus bus; + std::vector subscribers(kSubscriberCount); + + for (auto &sub : subscribers) { bus.Subscribe(&sub, &AotmicEventSubscriber::OnEvent); } + std::atomic invoke_count(0); + for (int i = 0; i < kPublishCount; i++) { + pool.PostTask([wg, atomic_event, &bus, &invoke_count]() { + bus.Post(atomic_event); + invoke_count.fetch_add(1); + wg.Done(); + }); + } + wg.Wait(); + + CHECK_EQ(invoke_count.load(), kPublishCount); + CHECK_EQ(value.load(), kPublishCount * kSubscriberCount); + } } diff --git a/src/sled/random_bench.cc b/src/sled/random_bench.cc index 91e3c98..99746fb 100644 --- a/src/sled/random_bench.cc +++ b/src/sled/random_bench.cc @@ -1,5 +1,6 @@ #include #include +PICOBENCH_SUITE("Random"); PICOBENCH([](picobench::state &s) { sled::Random rand(s.user_data()); diff --git a/src/sled/sigslot.h b/src/sled/sigslot.h index 424d61e..d796048 100644 --- a/src/sled/sigslot.h +++ b/src/sled/sigslot.h @@ -104,7 +104,7 @@ #include // On our copy of sigslot.h, we set single threading as default. -#define SIGSLOT_DEFAULT_MT_POLICY single_threaded +#define SIGSLOT_DEFAULT_MT_POLICY multi_threaded_local #if defined(SIGSLOT_PURE_ISO) || (!defined(WEBRTC_WIN) && !defined(__GNUG__) && !defined(SIGSLOT_USE_POSIX_THREADS)) #define _SIGSLOT_SINGLE_THREADED diff --git a/src/sled/strings/base64_bench.cc b/src/sled/strings/base64_bench.cc index 27654c6..d273c95 100644 --- a/src/sled/strings/base64_bench.cc +++ b/src/sled/strings/base64_bench.cc @@ -38,6 +38,6 @@ Base64Decode(picobench::state &state) (void) sled::Base64::Decode(base64_input); } } - +PICOBENCH_SUITE("Base64"); PICOBENCH(Base64Decode); PICOBENCH(Base64Encode); diff --git a/src/sled/system/thread_pool_bench.cc b/src/sled/system/thread_pool_bench.cc index 0bd6697..1f769af 100644 --- a/src/sled/system/thread_pool_bench.cc +++ b/src/sled/system/thread_pool_bench.cc @@ -13,4 +13,5 @@ ThreadPoolBench(picobench::state &state) } // BENCHMARK(ThreadPoolBench)->RangeMultiplier(10)->Range(10, 10000); +PICOBENCH_SUITE("TheadPool"); PICOBENCH(ThreadPoolBench); diff --git a/src/sled/system_time_bench.cc b/src/sled/system_time_bench.cc index 245dd91..d8388bd 100644 --- a/src/sled/system_time_bench.cc +++ b/src/sled/system_time_bench.cc @@ -7,4 +7,5 @@ SystemTimeNanos(picobench::state &state) for (auto _ : state) { (void) sled::SystemTimeNanos(); } } +PICOBENCH_SUITE("SystemTime"); PICOBENCH(SystemTimeNanos); diff --git a/src/sled/uri_bench.cc b/src/sled/uri_bench.cc index dd15ebf..172d406 100644 --- a/src/sled/uri_bench.cc +++ b/src/sled/uri_bench.cc @@ -34,4 +34,5 @@ ParseURI(picobench::state &s) } } +PICOBENCH_SUITE("URI"); PICOBENCH(ParseURI);