119 lines
3.1 KiB
C++
119 lines
3.1 KiB
C++
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
|
||
|
|
||
|
#pragma once
|
||
|
|
||
|
#if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
|
||
|
#define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
|
||
|
|
||
|
#include "../rx-includes.hpp"
|
||
|
|
||
|
namespace rxcpp {
|
||
|
|
||
|
namespace schedulers {
|
||
|
|
||
|
struct event_loop : public scheduler_interface
|
||
|
{
|
||
|
private:
|
||
|
typedef event_loop this_type;
|
||
|
event_loop(const this_type&);
|
||
|
|
||
|
struct loop_worker : public worker_interface
|
||
|
{
|
||
|
private:
|
||
|
typedef loop_worker this_type;
|
||
|
loop_worker(const this_type&);
|
||
|
|
||
|
typedef detail::schedulable_queue<
|
||
|
typename clock_type::time_point> queue_item_time;
|
||
|
|
||
|
typedef queue_item_time::item_type item_type;
|
||
|
|
||
|
composite_subscription lifetime;
|
||
|
worker controller;
|
||
|
std::shared_ptr<const scheduler_interface> alive;
|
||
|
|
||
|
public:
|
||
|
virtual ~loop_worker()
|
||
|
{
|
||
|
}
|
||
|
loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive)
|
||
|
: lifetime(cs)
|
||
|
, controller(w)
|
||
|
, alive(alive)
|
||
|
{
|
||
|
auto token = controller.add(cs);
|
||
|
cs.add([token, w](){
|
||
|
w.remove(token);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
virtual clock_type::time_point now() const {
|
||
|
return clock_type::now();
|
||
|
}
|
||
|
|
||
|
virtual void schedule(const schedulable& scbl) const {
|
||
|
controller.schedule(lifetime, scbl.get_action());
|
||
|
}
|
||
|
|
||
|
virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
|
||
|
controller.schedule(when, lifetime, scbl.get_action());
|
||
|
}
|
||
|
};
|
||
|
|
||
|
mutable thread_factory factory;
|
||
|
scheduler newthread;
|
||
|
mutable std::atomic<std::size_t> count;
|
||
|
composite_subscription loops_lifetime;
|
||
|
std::vector<worker> loops;
|
||
|
|
||
|
public:
|
||
|
event_loop()
|
||
|
: factory([](std::function<void()> start){
|
||
|
return std::thread(std::move(start));
|
||
|
})
|
||
|
, newthread(make_new_thread())
|
||
|
, count(0)
|
||
|
{
|
||
|
auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
|
||
|
while (remaining--) {
|
||
|
loops.push_back(newthread.create_worker(loops_lifetime));
|
||
|
}
|
||
|
}
|
||
|
explicit event_loop(thread_factory tf)
|
||
|
: factory(tf)
|
||
|
, newthread(make_new_thread(tf))
|
||
|
, count(0)
|
||
|
{
|
||
|
auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
|
||
|
while (remaining--) {
|
||
|
loops.push_back(newthread.create_worker(loops_lifetime));
|
||
|
}
|
||
|
}
|
||
|
virtual ~event_loop()
|
||
|
{
|
||
|
loops_lifetime.unsubscribe();
|
||
|
}
|
||
|
|
||
|
virtual clock_type::time_point now() const {
|
||
|
return clock_type::now();
|
||
|
}
|
||
|
|
||
|
virtual worker create_worker(composite_subscription cs) const {
|
||
|
return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this()));
|
||
|
}
|
||
|
};
|
||
|
|
||
|
inline scheduler make_event_loop() {
|
||
|
static scheduler instance = make_scheduler<event_loop>();
|
||
|
return instance;
|
||
|
}
|
||
|
inline scheduler make_event_loop(thread_factory tf) {
|
||
|
return make_scheduler<event_loop>(tf);
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
#endif
|