From 35efe428844bdb37c2893e52d04ad1836e7852da Mon Sep 17 00:00:00 2001 From: Dawid Drozd Date: Thu, 12 Mar 2020 13:59:40 +0100 Subject: [PATCH] Fix WaitPerk for dummy waiting case It could happen that we had some events in queue but we still would wait for new events. --- lib/src/dexode/eventbus/perk/WaitPerk.cpp | 23 +++-- lib/src/dexode/eventbus/perk/WaitPerk.hpp | 2 + test/integration/CMakeLists.txt | 2 + .../src/dexode/eventbus/test/SuiteWait.cpp | 85 +++++++++++++++++++ 4 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 test/integration/src/dexode/eventbus/test/SuiteWait.cpp diff --git a/lib/src/dexode/eventbus/perk/WaitPerk.cpp b/lib/src/dexode/eventbus/perk/WaitPerk.cpp index 7114a33..1e44831 100644 --- a/lib/src/dexode/eventbus/perk/WaitPerk.cpp +++ b/lib/src/dexode/eventbus/perk/WaitPerk.cpp @@ -8,26 +8,35 @@ namespace dexode::eventbus::perk bool WaitPerk::wait() { - using namespace std::chrono_literals; - std::unique_lock lock(_waitMutex); - _eventWaiting.wait(lock); + if(not _hasEvents) + { + using namespace std::chrono_literals; + std::unique_lock lock(_waitMutex); + _eventWaiting.wait(lock); + } + _hasEvents = false; // reset, assume that processing of events took place return true; } bool WaitPerk::waitFor(const std::chrono::milliseconds timeout) { - using namespace std::chrono_literals; - std::unique_lock lock(_waitMutex); - if(_eventWaiting.wait_for(lock, timeout) == std::cv_status::timeout) + if(not _hasEvents) { - return false; + using namespace std::chrono_literals; + std::unique_lock lock(_waitMutex); + if(_eventWaiting.wait_for(lock, timeout) == std::cv_status::timeout) + { + return false; + } } + _hasEvents = false; // reset, assume that processing of events took place return true; } Flag WaitPerk::onPostponeEvent(PostponeHelper&) { + _hasEvents = true; _eventWaiting.notify_one(); return Flag::postpone_continue; } diff --git a/lib/src/dexode/eventbus/perk/WaitPerk.hpp b/lib/src/dexode/eventbus/perk/WaitPerk.hpp index c3f3efc..e22174a 100644 --- a/lib/src/dexode/eventbus/perk/WaitPerk.hpp +++ b/lib/src/dexode/eventbus/perk/WaitPerk.hpp @@ -3,6 +3,7 @@ // #pragma once +#include #include #include #include @@ -28,6 +29,7 @@ public: private: std::condition_variable _eventWaiting; std::mutex _waitMutex; + std::atomic _hasEvents = false; }; } // namespace dexode::eventbus::perk diff --git a/test/integration/CMakeLists.txt b/test/integration/CMakeLists.txt index 177e295..845b20c 100644 --- a/test/integration/CMakeLists.txt +++ b/test/integration/CMakeLists.txt @@ -1,8 +1,10 @@ # # Target definition add_executable(EventBus-IntegrationTest + src/dexode/eventbus/test/SuiteWait.cpp src/main.cpp ) + target_include_directories(EventBus-IntegrationTest PRIVATE src/) target_compile_options(EventBus-IntegrationTest PUBLIC diff --git a/test/integration/src/dexode/eventbus/test/SuiteWait.cpp b/test/integration/src/dexode/eventbus/test/SuiteWait.cpp new file mode 100644 index 0000000..900c8d3 --- /dev/null +++ b/test/integration/src/dexode/eventbus/test/SuiteWait.cpp @@ -0,0 +1,85 @@ +// +// Created by gelldur on 12.03.2020. +// +#include +#include +#include +#include +#include + +#include + +#include "dexode/EventBus.hpp" +#include "dexode/eventbus/perk/PerkEventBus.hpp" +#include "dexode/eventbus/perk/WaitPerk.hpp" + +using namespace std::chrono_literals; + +namespace +{ +struct EventTest +{ + std::string data; + std::chrono::steady_clock::time_point created = std::chrono::steady_clock::now(); +}; +} // namespace + +TEST_CASE("Should not be proccessed with unnecessary delay", "[concurrent][EventBus]") +{ + auto bus = std::make_shared(); + bus->addPerk(std::make_unique()) + .registerPostPostpone(&dexode::eventbus::perk::WaitPerk::onPostponeEvent); + + auto* waitPerk = bus->getPerk(); + REQUIRE(waitPerk != nullptr); + + dexode::eventbus::perk::PerkEventBus::Listener listener{bus}; + listener.listen([bus](const EventTest& event) { + const auto eventAge = std::chrono::duration_cast( + std::chrono::steady_clock::now() - event.created); + CHECK(eventAge < 5ms); + std::cout << "Event:" << event.data << " old: " << eventAge.count() << "ms" << std::endl; + + bus->postpone(EventTest{"other"}); + }); + + // Worker which will send event every 10 ms + std::atomic isWorking = true; + + std::vector producers; + producers.emplace_back([&bus, &isWorking]() { + while(isWorking) + { + bus->postpone(EventTest{"producer1"}); + std::this_thread::sleep_for(30ms); + } + }); + + for(int i = 0; i < 20; ++i) + { + auto start = std::chrono::steady_clock::now(); + if(waitPerk->waitFor(20ms)) + { + const auto sleepTime = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + + std::cout << "[SUCCESS] I was sleeping for: " << sleepTime.count() << " ms" + << std::endl; + bus->process(); + } + else + { + const auto sleepTime = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + CHECK(sleepTime < 5ms); + // No events waiting for us + std::cout << "I was sleeping for: " << sleepTime.count() << " ms" << std::endl; + } + } + + isWorking = false; + for(auto& producer : producers) + { + producer.join(); + } +}