183 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			183 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 | 
						|
 | 
						|
#pragma once
 | 
						|
 | 
						|
#if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
 | 
						|
#define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
 | 
						|
 | 
						|
#include "../rx-includes.hpp"
 | 
						|
 | 
						|
namespace rxcpp {
 | 
						|
 | 
						|
namespace schedulers {
 | 
						|
 | 
						|
typedef std::function<std::thread(std::function<void()>)> thread_factory;
 | 
						|
 | 
						|
struct new_thread : public scheduler_interface
 | 
						|
{
 | 
						|
private:
 | 
						|
    typedef new_thread this_type;
 | 
						|
    new_thread(const this_type&);
 | 
						|
 | 
						|
    struct new_worker : public worker_interface
 | 
						|
    {
 | 
						|
    private:
 | 
						|
        typedef new_worker this_type;
 | 
						|
 | 
						|
        typedef detail::action_queue queue_type;
 | 
						|
 | 
						|
        new_worker(const this_type&);
 | 
						|
 | 
						|
        struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
 | 
						|
        {
 | 
						|
            typedef detail::schedulable_queue<
 | 
						|
                typename clock_type::time_point> queue_item_time;
 | 
						|
 | 
						|
            typedef queue_item_time::item_type item_type;
 | 
						|
 | 
						|
            virtual ~new_worker_state()
 | 
						|
            {
 | 
						|
            }
 | 
						|
 | 
						|
            explicit new_worker_state(composite_subscription cs)
 | 
						|
                : lifetime(cs)
 | 
						|
            {
 | 
						|
            }
 | 
						|
 | 
						|
            composite_subscription lifetime;
 | 
						|
            mutable std::mutex lock;
 | 
						|
            mutable std::condition_variable wake;
 | 
						|
            mutable queue_item_time q;
 | 
						|
            std::thread worker;
 | 
						|
            recursion r;
 | 
						|
        };
 | 
						|
 | 
						|
        std::shared_ptr<new_worker_state> state;
 | 
						|
 | 
						|
    public:
 | 
						|
        virtual ~new_worker()
 | 
						|
        {
 | 
						|
        }
 | 
						|
 | 
						|
        explicit new_worker(std::shared_ptr<new_worker_state> ws)
 | 
						|
            : state(ws)
 | 
						|
        {
 | 
						|
        }
 | 
						|
 | 
						|
        new_worker(composite_subscription cs, thread_factory& tf)
 | 
						|
            : state(std::make_shared<new_worker_state>(cs))
 | 
						|
        {
 | 
						|
            auto keepAlive = state;
 | 
						|
 | 
						|
            state->lifetime.add([keepAlive](){
 | 
						|
                std::unique_lock<std::mutex> guard(keepAlive->lock);
 | 
						|
                auto expired = std::move(keepAlive->q);
 | 
						|
                keepAlive->q = new_worker_state::queue_item_time{};
 | 
						|
                if (!keepAlive->q.empty()) std::terminate();
 | 
						|
                keepAlive->wake.notify_one();
 | 
						|
 | 
						|
                if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
 | 
						|
                    guard.unlock();
 | 
						|
                    keepAlive->worker.join();
 | 
						|
                }
 | 
						|
                else {
 | 
						|
                    keepAlive->worker.detach();
 | 
						|
                }
 | 
						|
            });
 | 
						|
 | 
						|
            state->worker = tf([keepAlive](){
 | 
						|
 | 
						|
                // take ownership
 | 
						|
                queue_type::ensure(std::make_shared<new_worker>(keepAlive));
 | 
						|
                // release ownership
 | 
						|
                RXCPP_UNWIND_AUTO([]{
 | 
						|
                    queue_type::destroy();
 | 
						|
                });
 | 
						|
 | 
						|
                for(;;) {
 | 
						|
                    std::unique_lock<std::mutex> guard(keepAlive->lock);
 | 
						|
                    if (keepAlive->q.empty()) {
 | 
						|
                        keepAlive->wake.wait(guard, [keepAlive](){
 | 
						|
                            return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
 | 
						|
                        });
 | 
						|
                    }
 | 
						|
                    if (!keepAlive->lifetime.is_subscribed()) {
 | 
						|
                        break;
 | 
						|
                    }
 | 
						|
                    auto& peek = keepAlive->q.top();
 | 
						|
                    if (!peek.what.is_subscribed()) {
 | 
						|
                        keepAlive->q.pop();
 | 
						|
                        continue;
 | 
						|
                    }
 | 
						|
                    auto when = peek.when;
 | 
						|
                    if (clock_type::now() < when) {
 | 
						|
                        keepAlive->wake.wait_until(guard, when);
 | 
						|
                        continue;
 | 
						|
                    }
 | 
						|
                    auto what = peek.what;
 | 
						|
                    keepAlive->q.pop();
 | 
						|
                    keepAlive->r.reset(keepAlive->q.empty());
 | 
						|
                    guard.unlock();
 | 
						|
                    what(keepAlive->r.get_recurse());
 | 
						|
                }
 | 
						|
            });
 | 
						|
        }
 | 
						|
 | 
						|
        virtual clock_type::time_point now() const {
 | 
						|
            return clock_type::now();
 | 
						|
        }
 | 
						|
 | 
						|
        virtual void schedule(const schedulable& scbl) const {
 | 
						|
            schedule(now(), scbl);
 | 
						|
        }
 | 
						|
 | 
						|
        virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
 | 
						|
            if (scbl.is_subscribed()) {
 | 
						|
                std::unique_lock<std::mutex> guard(state->lock);
 | 
						|
                state->q.push(new_worker_state::item_type(when, scbl));
 | 
						|
                state->r.reset(false);
 | 
						|
            }
 | 
						|
            state->wake.notify_one();
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    mutable thread_factory factory;
 | 
						|
 | 
						|
public:
 | 
						|
    new_thread()
 | 
						|
        : factory([](std::function<void()> start){
 | 
						|
            return std::thread(std::move(start));
 | 
						|
        })
 | 
						|
    {
 | 
						|
    }
 | 
						|
    explicit new_thread(thread_factory tf)
 | 
						|
        : factory(tf)
 | 
						|
    {
 | 
						|
    }
 | 
						|
    virtual ~new_thread()
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    virtual clock_type::time_point now() const {
 | 
						|
        return clock_type::now();
 | 
						|
    }
 | 
						|
 | 
						|
    virtual worker create_worker(composite_subscription cs) const {
 | 
						|
        return worker(cs, std::make_shared<new_worker>(cs, factory));
 | 
						|
    }
 | 
						|
};
 | 
						|
 | 
						|
inline scheduler make_new_thread() {
 | 
						|
    static scheduler instance = make_scheduler<new_thread>();
 | 
						|
    return instance;
 | 
						|
}
 | 
						|
inline scheduler make_new_thread(thread_factory tf) {
 | 
						|
    return make_scheduler<new_thread>(tf);
 | 
						|
}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
#endif
 |