// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP) #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace schedulers { typedef std::function)> thread_factory; struct new_thread : public scheduler_interface { private: typedef new_thread this_type; new_thread(const this_type&); struct new_worker : public worker_interface { private: typedef new_worker this_type; typedef detail::action_queue queue_type; new_worker(const this_type&); struct new_worker_state : public std::enable_shared_from_this { typedef detail::schedulable_queue< typename clock_type::time_point> queue_item_time; typedef queue_item_time::item_type item_type; virtual ~new_worker_state() { } explicit new_worker_state(composite_subscription cs) : lifetime(cs) { } composite_subscription lifetime; mutable std::mutex lock; mutable std::condition_variable wake; mutable queue_item_time q; std::thread worker; recursion r; }; std::shared_ptr state; public: virtual ~new_worker() { } explicit new_worker(std::shared_ptr ws) : state(ws) { } new_worker(composite_subscription cs, thread_factory& tf) : state(std::make_shared(cs)) { auto keepAlive = state; state->lifetime.add([keepAlive](){ std::unique_lock guard(keepAlive->lock); auto expired = std::move(keepAlive->q); keepAlive->q = new_worker_state::queue_item_time{}; if (!keepAlive->q.empty()) std::terminate(); keepAlive->wake.notify_one(); if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) { guard.unlock(); keepAlive->worker.join(); } else { keepAlive->worker.detach(); } }); state->worker = tf([keepAlive](){ // take ownership queue_type::ensure(std::make_shared(keepAlive)); // release ownership RXCPP_UNWIND_AUTO([]{ queue_type::destroy(); }); for(;;) { std::unique_lock guard(keepAlive->lock); if (keepAlive->q.empty()) { keepAlive->wake.wait(guard, [keepAlive](){ return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty(); }); } if (!keepAlive->lifetime.is_subscribed()) { break; } auto& peek = keepAlive->q.top(); if (!peek.what.is_subscribed()) { keepAlive->q.pop(); continue; } auto when = peek.when; if (clock_type::now() < when) { keepAlive->wake.wait_until(guard, when); continue; } auto what = peek.what; keepAlive->q.pop(); keepAlive->r.reset(keepAlive->q.empty()); guard.unlock(); what(keepAlive->r.get_recurse()); } }); } virtual clock_type::time_point now() const { return clock_type::now(); } virtual void schedule(const schedulable& scbl) const { schedule(now(), scbl); } virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { if (scbl.is_subscribed()) { std::unique_lock guard(state->lock); state->q.push(new_worker_state::item_type(when, scbl)); state->r.reset(false); } state->wake.notify_one(); } }; mutable thread_factory factory; public: new_thread() : factory([](std::function start){ return std::thread(std::move(start)); }) { } explicit new_thread(thread_factory tf) : factory(tf) { } virtual ~new_thread() { } virtual clock_type::time_point now() const { return clock_type::now(); } virtual worker create_worker(composite_subscription cs) const { return worker(cs, std::make_shared(cs, factory)); } }; inline scheduler make_new_thread() { static scheduler instance = make_scheduler(); return instance; } inline scheduler make_new_thread(thread_factory tf) { return make_scheduler(tf); } } } #endif