sled/3party/rxcpp/schedulers/rx-eventloop.hpp

119 lines
3.1 KiB
C++
Raw Normal View History

2024-03-14 20:50:17 +08:00
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#pragma once
#if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
#define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
#include "../rx-includes.hpp"
namespace rxcpp {
namespace schedulers {
struct event_loop : public scheduler_interface
{
private:
typedef event_loop this_type;
event_loop(const this_type&);
struct loop_worker : public worker_interface
{
private:
typedef loop_worker this_type;
loop_worker(const this_type&);
typedef detail::schedulable_queue<
typename clock_type::time_point> queue_item_time;
typedef queue_item_time::item_type item_type;
composite_subscription lifetime;
worker controller;
std::shared_ptr<const scheduler_interface> alive;
public:
virtual ~loop_worker()
{
}
loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive)
: lifetime(cs)
, controller(w)
, alive(alive)
{
auto token = controller.add(cs);
cs.add([token, w](){
w.remove(token);
});
}
virtual clock_type::time_point now() const {
return clock_type::now();
}
virtual void schedule(const schedulable& scbl) const {
controller.schedule(lifetime, scbl.get_action());
}
virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
controller.schedule(when, lifetime, scbl.get_action());
}
};
mutable thread_factory factory;
scheduler newthread;
mutable std::atomic<std::size_t> count;
composite_subscription loops_lifetime;
std::vector<worker> loops;
public:
event_loop()
: factory([](std::function<void()> start){
return std::thread(std::move(start));
})
, newthread(make_new_thread())
, count(0)
{
auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
while (remaining--) {
loops.push_back(newthread.create_worker(loops_lifetime));
}
}
explicit event_loop(thread_factory tf)
: factory(tf)
, newthread(make_new_thread(tf))
, count(0)
{
auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
while (remaining--) {
loops.push_back(newthread.create_worker(loops_lifetime));
}
}
virtual ~event_loop()
{
loops_lifetime.unsubscribe();
}
virtual clock_type::time_point now() const {
return clock_type::now();
}
virtual worker create_worker(composite_subscription cs) const {
return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this()));
}
};
inline scheduler make_event_loop() {
static scheduler instance = make_scheduler<event_loop>();
return instance;
}
inline scheduler make_event_loop(thread_factory tf) {
return make_scheduler<event_loop>(tf);
}
}
}
#endif