Fix data race

Bad usage of condition variable
This commit is contained in:
Dawid Drozd 2020-03-13 09:59:50 +01:00
parent 35efe42884
commit b4bceafb40
4 changed files with 96 additions and 25 deletions

View File

@ -8,35 +8,48 @@ namespace dexode::eventbus::perk
bool WaitPerk::wait() bool WaitPerk::wait()
{ {
if(not _hasEvents) using namespace std::chrono_literals;
std::unique_lock<std::mutex> lock(_waitMutex);
if(_hasEvents)
{ {
using namespace std::chrono_literals; _hasEvents = false; // reset, assume that processing of events took place
std::unique_lock<std::mutex> lock(_waitMutex); return true;
_eventWaiting.wait(lock);
} }
_eventWaiting.wait(lock, [this]() { return _hasEvents; });
_hasEvents = false; // reset, assume that processing of events took place // At this moment we are still under mutex
return true; if(_hasEvents)
{
_hasEvents = false; // reset, assume that processing of events took place
return true;
}
return false;
} }
bool WaitPerk::waitFor(const std::chrono::milliseconds timeout) bool WaitPerk::waitFor(const std::chrono::milliseconds timeout)
{ {
if(not _hasEvents) using namespace std::chrono_literals;
std::unique_lock<std::mutex> lock(_waitMutex);
if(_hasEvents)
{ {
using namespace std::chrono_literals; _hasEvents = false; // reset
std::unique_lock<std::mutex> lock(_waitMutex); return true;
if(_eventWaiting.wait_for(lock, timeout) == std::cv_status::timeout)
{
return false;
}
} }
_hasEvents = false; // reset, assume that processing of events took place if(_eventWaiting.wait_for(lock, timeout, [this]() { return _hasEvents; }))
return true; {
// At this moment we are still under mutex
_hasEvents = false; // reset
return true;
}
return false;
} }
Flag WaitPerk::onPostponeEvent(PostponeHelper&) Flag WaitPerk::onPostponeEvent(PostponeHelper&)
{ {
_hasEvents = true; {
std::lock_guard<std::mutex> lock(_waitMutex);
_hasEvents = true;
}
_eventWaiting.notify_one(); _eventWaiting.notify_one();
return Flag::postpone_continue; return Flag::postpone_continue;
} }

View File

@ -3,7 +3,6 @@
// //
#pragma once #pragma once
#include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <mutex> #include <mutex>
@ -29,7 +28,7 @@ public:
private: private:
std::condition_variable _eventWaiting; std::condition_variable _eventWaiting;
std::mutex _waitMutex; std::mutex _waitMutex;
std::atomic<bool> _hasEvents = false; bool _hasEvents = false;
}; };
} // namespace dexode::eventbus::perk } // namespace dexode::eventbus::perk

View File

@ -39,8 +39,9 @@ TEST_CASE("Should not be proccessed with unnecessary delay", "[concurrent][Event
std::chrono::steady_clock::now() - event.created); std::chrono::steady_clock::now() - event.created);
CHECK(eventAge < 5ms); CHECK(eventAge < 5ms);
std::cout << "Event:" << event.data << " old: " << eventAge.count() << "ms" << std::endl; std::cout << "Event:" << event.data << " old: " << eventAge.count() << "ms" << std::endl;
std::this_thread::sleep_for(2ms);
bus->postpone(EventTest{"other"}); bus->postpone(EventTest{"other"});
std::this_thread::sleep_for(3ms);
}); });
// Worker which will send event every 10 ms // Worker which will send event every 10 ms
@ -51,21 +52,79 @@ TEST_CASE("Should not be proccessed with unnecessary delay", "[concurrent][Event
while(isWorking) while(isWorking)
{ {
bus->postpone(EventTest{"producer1"}); bus->postpone(EventTest{"producer1"});
std::this_thread::sleep_for(30ms); std::this_thread::sleep_for(500ms);
} }
}); });
for(int i = 0; i < 20; ++i) for(int i = 0; i < 20;)
{ {
auto start = std::chrono::steady_clock::now(); auto start = std::chrono::steady_clock::now();
if(waitPerk->waitFor(20ms)) if(waitPerk->waitFor(2000ms))
{ {
const auto sleepTime = std::chrono::duration_cast<std::chrono::milliseconds>( const auto sleepTime = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start); std::chrono::steady_clock::now() - start);
std::cout << "[SUCCESS] I was sleeping for: " << sleepTime.count() << " ms" std::cout << "[SUCCESS] I was sleeping for: " << sleepTime.count() << " ms i:" << i
<< std::endl; << std::endl;
bus->process(); i += bus->process();
}
else
{
const auto sleepTime = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
CHECK(sleepTime < 5ms);
// No events waiting for us
std::cout << "I was sleeping for: " << sleepTime.count() << " ms" << std::endl;
}
}
isWorking = false;
for(auto& producer : producers)
{
producer.join();
}
}
TEST_CASE("Should wait for event being scheduled", "[concurrent][EventBus]")
{
auto bus = std::make_shared<dexode::eventbus::perk::PerkEventBus>();
bus->addPerk(std::make_unique<dexode::eventbus::perk::WaitPerk>())
.registerPostPostpone(&dexode::eventbus::perk::WaitPerk::onPostponeEvent);
auto* waitPerk = bus->getPerk<dexode::eventbus::perk::WaitPerk>();
REQUIRE(waitPerk != nullptr);
dexode::eventbus::perk::PerkEventBus::Listener listener{bus};
listener.listen([bus](const EventTest& event) {
const auto eventAge = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - event.created);
CHECK(eventAge < 5ms);
std::cout << "Event:" << event.data << " old: " << eventAge.count() << "ms" << std::endl;
});
std::atomic<bool> isWorking = true;
std::vector<std::thread> producers;
producers.emplace_back([&bus, &isWorking]() {
while(isWorking)
{
std::this_thread::sleep_for(10ms);
bus->postpone(EventTest{"producer1"});
}
});
for(int i = 0; i < 20;)
{
auto start = std::chrono::steady_clock::now();
if(waitPerk->waitFor(40ms))
{
const auto sleepTime = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
CHECK(sleepTime >= 9ms);
std::cout << "[SUCCESS] I was sleeping for: " << sleepTime.count() << " ms i:" << i
<< std::endl;
i += bus->process();
} }
else else
{ {

View File

@ -158,7 +158,7 @@ TEST_CASE("Should wait work", "[concurrent][EventBus]")
int beforeConsumed = consumed; int beforeConsumed = consumed;
consumed += bus.process(); consumed += bus.process();
INFO("If events available then consumed count should change") INFO("If events available then consumed count should change")
CHECK(consumed > beforeConsumed); CHECK(consumed >= beforeConsumed);
} }
else else
{ {