mirror of
https://github.com/gelldur/EventBus.git
synced 2024-12-26 18:51:02 +08:00
Fix WaitPerk for dummy waiting case
It could happen that we had some events in queue but we still would wait for new events.
This commit is contained in:
parent
7c63021ea3
commit
35efe42884
@ -8,26 +8,35 @@ namespace dexode::eventbus::perk
|
||||
|
||||
bool WaitPerk::wait()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
std::unique_lock<std::mutex> lock(_waitMutex);
|
||||
_eventWaiting.wait(lock);
|
||||
if(not _hasEvents)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
std::unique_lock<std::mutex> lock(_waitMutex);
|
||||
_eventWaiting.wait(lock);
|
||||
}
|
||||
|
||||
_hasEvents = false; // reset, assume that processing of events took place
|
||||
return true;
|
||||
}
|
||||
|
||||
bool WaitPerk::waitFor(const std::chrono::milliseconds timeout)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
std::unique_lock<std::mutex> lock(_waitMutex);
|
||||
if(_eventWaiting.wait_for(lock, timeout) == std::cv_status::timeout)
|
||||
if(not _hasEvents)
|
||||
{
|
||||
return false;
|
||||
using namespace std::chrono_literals;
|
||||
std::unique_lock<std::mutex> lock(_waitMutex);
|
||||
if(_eventWaiting.wait_for(lock, timeout) == std::cv_status::timeout)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
_hasEvents = false; // reset, assume that processing of events took place
|
||||
return true;
|
||||
}
|
||||
|
||||
Flag WaitPerk::onPostponeEvent(PostponeHelper&)
|
||||
{
|
||||
_hasEvents = true;
|
||||
_eventWaiting.notify_one();
|
||||
return Flag::postpone_continue;
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
//
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
@ -28,6 +29,7 @@ public:
|
||||
private:
|
||||
std::condition_variable _eventWaiting;
|
||||
std::mutex _waitMutex;
|
||||
std::atomic<bool> _hasEvents = false;
|
||||
};
|
||||
|
||||
} // namespace dexode::eventbus::perk
|
||||
|
@ -1,8 +1,10 @@
|
||||
#
|
||||
# Target definition
|
||||
add_executable(EventBus-IntegrationTest
|
||||
src/dexode/eventbus/test/SuiteWait.cpp
|
||||
src/main.cpp
|
||||
)
|
||||
|
||||
target_include_directories(EventBus-IntegrationTest PRIVATE src/)
|
||||
|
||||
target_compile_options(EventBus-IntegrationTest PUBLIC
|
||||
|
85
test/integration/src/dexode/eventbus/test/SuiteWait.cpp
Normal file
85
test/integration/src/dexode/eventbus/test/SuiteWait.cpp
Normal file
@ -0,0 +1,85 @@
|
||||
//
|
||||
// Created by gelldur on 12.03.2020.
|
||||
//
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include <catch2/catch.hpp>
|
||||
|
||||
#include "dexode/EventBus.hpp"
|
||||
#include "dexode/eventbus/perk/PerkEventBus.hpp"
|
||||
#include "dexode/eventbus/perk/WaitPerk.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace
|
||||
{
|
||||
struct EventTest
|
||||
{
|
||||
std::string data;
|
||||
std::chrono::steady_clock::time_point created = std::chrono::steady_clock::now();
|
||||
};
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("Should not be proccessed with unnecessary delay", "[concurrent][EventBus]")
|
||||
{
|
||||
auto bus = std::make_shared<dexode::eventbus::perk::PerkEventBus>();
|
||||
bus->addPerk(std::make_unique<dexode::eventbus::perk::WaitPerk>())
|
||||
.registerPostPostpone(&dexode::eventbus::perk::WaitPerk::onPostponeEvent);
|
||||
|
||||
auto* waitPerk = bus->getPerk<dexode::eventbus::perk::WaitPerk>();
|
||||
REQUIRE(waitPerk != nullptr);
|
||||
|
||||
dexode::eventbus::perk::PerkEventBus::Listener listener{bus};
|
||||
listener.listen([bus](const EventTest& event) {
|
||||
const auto eventAge = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - event.created);
|
||||
CHECK(eventAge < 5ms);
|
||||
std::cout << "Event:" << event.data << " old: " << eventAge.count() << "ms" << std::endl;
|
||||
|
||||
bus->postpone(EventTest{"other"});
|
||||
});
|
||||
|
||||
// Worker which will send event every 10 ms
|
||||
std::atomic<bool> isWorking = true;
|
||||
|
||||
std::vector<std::thread> producers;
|
||||
producers.emplace_back([&bus, &isWorking]() {
|
||||
while(isWorking)
|
||||
{
|
||||
bus->postpone(EventTest{"producer1"});
|
||||
std::this_thread::sleep_for(30ms);
|
||||
}
|
||||
});
|
||||
|
||||
for(int i = 0; i < 20; ++i)
|
||||
{
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
if(waitPerk->waitFor(20ms))
|
||||
{
|
||||
const auto sleepTime = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - start);
|
||||
|
||||
std::cout << "[SUCCESS] I was sleeping for: " << sleepTime.count() << " ms"
|
||||
<< std::endl;
|
||||
bus->process();
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto sleepTime = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - start);
|
||||
CHECK(sleepTime < 5ms);
|
||||
// No events waiting for us
|
||||
std::cout << "I was sleeping for: " << sleepTime.count() << " ms" << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
isWorking = false;
|
||||
for(auto& producer : producers)
|
||||
{
|
||||
producer.join();
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user