Add Listener::isListening

Requested feature in #48
This commit is contained in:
Dawid Drozd 2023-06-17 15:01:52 +04:00
parent b30f87483e
commit ed4b80d1f5
9 changed files with 94 additions and 3 deletions

View File

@ -5,7 +5,7 @@ set(CPACK_GENERATOR "" CACHE STRING "Set packages CPack should build e.g. ZIP;TG
# BUILD_SHARED_LIBS can controll build type!
project(EventBus
VERSION 3.0.3
VERSION 3.0.4
LANGUAGES CXX
)

View File

@ -104,6 +104,18 @@ eventbus::stream::EventStream* EventBus::obtainStream(
}
}
eventbus::stream::EventStream* EventBus::streamForEvent(
eventbus::internal::event_id_t eventID) const
{
std::lock_guard writeGuard{_mutexStreams};
auto* found = findStreamUnsafe(eventID);
if(found != nullptr)
{
return found;
}
return nullptr;
}
bool EventBus::postponeEvent(eventbus::PostponeHelper& postponeCall)
{
auto* eventStream = obtainStream(postponeCall.eventID, postponeCall.createStreamCallback);

View File

@ -29,6 +29,9 @@ public:
std::size_t processLimit(std::size_t limit);
protected:
eventbus::stream::EventStream* streamForEvent(
eventbus::internal::event_id_t eventID) const override;
eventbus::stream::EventStream* obtainStream(
eventbus::internal::event_id_t eventID,
eventbus::CreateStreamCallback createStreamCallback);

View File

@ -92,6 +92,9 @@ protected:
virtual void unlistenAll(std::uint32_t listenerID) = 0;
virtual void unlisten(std::uint32_t listenerID, internal::event_id_t eventID) = 0;
virtual eventbus::stream::EventStream* streamForEvent(
eventbus::internal::event_id_t eventID) const = 0;
private:
std::atomic<std::uint32_t> _lastID{0};

View File

@ -131,6 +131,19 @@ public:
return _bus;
}
template <typename Event>
[[nodiscard]] bool isListening() const
{
static_assert(internal::validateEvent<Event>(), "Invalid event");
if(_bus == nullptr)
{
throw std::runtime_error{"bus is null"};
}
return internal::ListenerAttorney<Bus>::isListening(*_bus
, _id
, internal::event_id<Event>());
}
private:
std::uint32_t _id = 0;
std::shared_ptr<Bus> _bus = nullptr;

View File

@ -6,6 +6,7 @@
#include <functional>
#include "dexode/eventbus/internal/event_id.hpp"
#include "dexode/eventbus/stream/EventStream.hpp"
namespace dexode::eventbus
{
@ -49,6 +50,18 @@ private:
{
bus.unlisten(listenerID, eventID);
}
static constexpr bool isListening(EventBus_t& bus,
const std::uint32_t listenerID,
const event_id_t eventID)
{
const eventbus::stream::EventStream* stream = bus.streamForEvent(eventID);
if(stream != nullptr)
{
return stream->hasListener(listenerID);
}
return false;
}
};
} // namespace dexode::eventbus::internal

View File

@ -16,6 +16,8 @@ public:
virtual bool addListener(std::uint32_t listenerID, std::any callback) = 0;
virtual bool removeListener(std::uint32_t listenerID) = 0;
[[nodiscard]] virtual bool hasListener(std::uint32_t listenerID) const = 0;
};
class NoopEventStream : public EventStream
@ -37,6 +39,10 @@ public:
{
throw std::runtime_error{"Noop"};
}
[[nodiscard]] bool hasListener(std::uint32_t listenerID) const override
{
throw std::runtime_error{"Noop"};
}
};
} // namespace dexode::eventbus::stream

View File

@ -98,6 +98,13 @@ public:
return not _queue.empty();
}
[[nodiscard]] bool hasListener(std::uint32_t listenerID) const override
{
std::shared_lock readGuard{_mutexCallbacks};
auto found = std::find(_listenerIDs.begin(), _listenerIDs.end(), listenerID);
return found != _listenerIDs.end();
}
private:
std::vector<std::uint32_t> _listenerIDs;
std::vector<Event> _queue;
@ -106,8 +113,8 @@ private:
std::atomic<bool> _isProcessing{false};
std::vector<std::pair<std::uint32_t, Callback>> _waiting;
std::shared_mutex _mutexEvent;
std::shared_mutex _mutexCallbacks;
mutable std::shared_mutex _mutexEvent;
mutable std::shared_mutex _mutexCallbacks;
void flushWaitingOnes()
{

View File

@ -297,4 +297,38 @@ TEST_CASE("Should not allow for mistake with move ctor", "[EventBus][Listener]")
REQUIRE(TestClazz::counter == 1);
}
TEST_CASE("Should allow to check if listener is already listening", "[EventBus][Listener]")
{
// Related to Github Issue: https://github.com/gelldur/EventBus/issues/48
EventBus bus;
int callCount = 0;
auto listener = Listener::createNotOwning(bus);
CHECK_FALSE(listener.isListening<event::Value>());
bus.postpone(event::Value{3});
REQUIRE(bus.process() == 1);
REQUIRE(callCount == 0); // not listening
listener.listen<event::Value>([&](const event::Value& event)
{
REQUIRE(event.value == 2);
++callCount;
});
CHECK(listener.isListening<event::Value>());
bus.postpone(event::Value{2});
REQUIRE(bus.process() == 1);
REQUIRE(callCount == 1);
CHECK(listener.isListening<event::Value>());
listener.unlisten<event::Value>();
CHECK_FALSE(listener.isListening<event::Value>());
bus.postpone(event::Value{1});
REQUIRE(bus.process() == 1);
REQUIRE(callCount == 1);
}
} // namespace dexode::eventbus::test