From b4bceafb400c48819021d85e94f3489b3e8bc2bf Mon Sep 17 00:00:00 2001 From: Dawid Drozd Date: Fri, 13 Mar 2020 09:59:50 +0100 Subject: [PATCH] Fix data race Bad usage of condition variable --- lib/src/dexode/eventbus/perk/WaitPerk.cpp | 45 +++++++----- lib/src/dexode/eventbus/perk/WaitPerk.hpp | 3 +- .../src/dexode/eventbus/test/SuiteWait.cpp | 71 +++++++++++++++++-- .../eventbus/test/SuiteConcurrentEventBus.cpp | 2 +- 4 files changed, 96 insertions(+), 25 deletions(-) diff --git a/lib/src/dexode/eventbus/perk/WaitPerk.cpp b/lib/src/dexode/eventbus/perk/WaitPerk.cpp index 1e44831..c457497 100644 --- a/lib/src/dexode/eventbus/perk/WaitPerk.cpp +++ b/lib/src/dexode/eventbus/perk/WaitPerk.cpp @@ -8,35 +8,48 @@ namespace dexode::eventbus::perk bool WaitPerk::wait() { - if(not _hasEvents) + using namespace std::chrono_literals; + std::unique_lock lock(_waitMutex); + if(_hasEvents) { - using namespace std::chrono_literals; - std::unique_lock lock(_waitMutex); - _eventWaiting.wait(lock); + _hasEvents = false; // reset, assume that processing of events took place + return true; } + _eventWaiting.wait(lock, [this]() { return _hasEvents; }); - _hasEvents = false; // reset, assume that processing of events took place - return true; + // At this moment we are still under mutex + if(_hasEvents) + { + _hasEvents = false; // reset, assume that processing of events took place + return true; + } + return false; } bool WaitPerk::waitFor(const std::chrono::milliseconds timeout) { - if(not _hasEvents) + using namespace std::chrono_literals; + std::unique_lock lock(_waitMutex); + if(_hasEvents) { - using namespace std::chrono_literals; - std::unique_lock lock(_waitMutex); - if(_eventWaiting.wait_for(lock, timeout) == std::cv_status::timeout) - { - return false; - } + _hasEvents = false; // reset + return true; } - _hasEvents = false; // reset, assume that processing of events took place - return true; + if(_eventWaiting.wait_for(lock, timeout, [this]() { return _hasEvents; })) + { + // At this moment we are still under mutex + _hasEvents = false; // reset + return true; + } + return false; } Flag WaitPerk::onPostponeEvent(PostponeHelper&) { - _hasEvents = true; + { + std::lock_guard lock(_waitMutex); + _hasEvents = true; + } _eventWaiting.notify_one(); return Flag::postpone_continue; } diff --git a/lib/src/dexode/eventbus/perk/WaitPerk.hpp b/lib/src/dexode/eventbus/perk/WaitPerk.hpp index e22174a..af7b0b6 100644 --- a/lib/src/dexode/eventbus/perk/WaitPerk.hpp +++ b/lib/src/dexode/eventbus/perk/WaitPerk.hpp @@ -3,7 +3,6 @@ // #pragma once -#include #include #include #include @@ -29,7 +28,7 @@ public: private: std::condition_variable _eventWaiting; std::mutex _waitMutex; - std::atomic _hasEvents = false; + bool _hasEvents = false; }; } // namespace dexode::eventbus::perk diff --git a/test/integration/src/dexode/eventbus/test/SuiteWait.cpp b/test/integration/src/dexode/eventbus/test/SuiteWait.cpp index 900c8d3..e9c986c 100644 --- a/test/integration/src/dexode/eventbus/test/SuiteWait.cpp +++ b/test/integration/src/dexode/eventbus/test/SuiteWait.cpp @@ -39,8 +39,9 @@ TEST_CASE("Should not be proccessed with unnecessary delay", "[concurrent][Event std::chrono::steady_clock::now() - event.created); CHECK(eventAge < 5ms); std::cout << "Event:" << event.data << " old: " << eventAge.count() << "ms" << std::endl; - + std::this_thread::sleep_for(2ms); bus->postpone(EventTest{"other"}); + std::this_thread::sleep_for(3ms); }); // Worker which will send event every 10 ms @@ -51,21 +52,79 @@ TEST_CASE("Should not be proccessed with unnecessary delay", "[concurrent][Event while(isWorking) { bus->postpone(EventTest{"producer1"}); - std::this_thread::sleep_for(30ms); + std::this_thread::sleep_for(500ms); } }); - for(int i = 0; i < 20; ++i) + for(int i = 0; i < 20;) { auto start = std::chrono::steady_clock::now(); - if(waitPerk->waitFor(20ms)) + if(waitPerk->waitFor(2000ms)) { const auto sleepTime = std::chrono::duration_cast( std::chrono::steady_clock::now() - start); - std::cout << "[SUCCESS] I was sleeping for: " << sleepTime.count() << " ms" + std::cout << "[SUCCESS] I was sleeping for: " << sleepTime.count() << " ms i:" << i << std::endl; - bus->process(); + i += bus->process(); + } + else + { + const auto sleepTime = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + CHECK(sleepTime < 5ms); + // No events waiting for us + std::cout << "I was sleeping for: " << sleepTime.count() << " ms" << std::endl; + } + } + + isWorking = false; + for(auto& producer : producers) + { + producer.join(); + } +} + +TEST_CASE("Should wait for event being scheduled", "[concurrent][EventBus]") +{ + auto bus = std::make_shared(); + bus->addPerk(std::make_unique()) + .registerPostPostpone(&dexode::eventbus::perk::WaitPerk::onPostponeEvent); + + auto* waitPerk = bus->getPerk(); + REQUIRE(waitPerk != nullptr); + + dexode::eventbus::perk::PerkEventBus::Listener listener{bus}; + listener.listen([bus](const EventTest& event) { + const auto eventAge = std::chrono::duration_cast( + std::chrono::steady_clock::now() - event.created); + CHECK(eventAge < 5ms); + std::cout << "Event:" << event.data << " old: " << eventAge.count() << "ms" << std::endl; + }); + + std::atomic isWorking = true; + + std::vector producers; + producers.emplace_back([&bus, &isWorking]() { + while(isWorking) + { + std::this_thread::sleep_for(10ms); + bus->postpone(EventTest{"producer1"}); + } + }); + + for(int i = 0; i < 20;) + { + auto start = std::chrono::steady_clock::now(); + if(waitPerk->waitFor(40ms)) + { + const auto sleepTime = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + CHECK(sleepTime >= 9ms); + + std::cout << "[SUCCESS] I was sleeping for: " << sleepTime.count() << " ms i:" << i + << std::endl; + i += bus->process(); } else { diff --git a/test/unit/src/dexode/eventbus/test/SuiteConcurrentEventBus.cpp b/test/unit/src/dexode/eventbus/test/SuiteConcurrentEventBus.cpp index 1e27c0c..fd0348a 100644 --- a/test/unit/src/dexode/eventbus/test/SuiteConcurrentEventBus.cpp +++ b/test/unit/src/dexode/eventbus/test/SuiteConcurrentEventBus.cpp @@ -158,7 +158,7 @@ TEST_CASE("Should wait work", "[concurrent][EventBus]") int beforeConsumed = consumed; consumed += bus.process(); INFO("If events available then consumed count should change") - CHECK(consumed > beforeConsumed); + CHECK(consumed >= beforeConsumed); } else {