// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP) #define RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace schedulers { namespace detail { template struct virtual_time_base : std::enable_shared_from_this> { private: typedef virtual_time_base this_type; virtual_time_base(const virtual_time_base&); mutable bool isenabled; public: typedef Absolute absolute; typedef Relative relative; virtual ~virtual_time_base() { } protected: virtual_time_base() : isenabled(false) , clock_now(0) { } explicit virtual_time_base(absolute initialClock) : isenabled(false) , clock_now(initialClock) { } mutable absolute clock_now; typedef time_schedulable item_type; virtual absolute add(absolute, relative) const =0; virtual typename scheduler_base::clock_type::time_point to_time_point(absolute) const =0; virtual relative to_relative(typename scheduler_base::clock_type::duration) const =0; virtual item_type top() const =0; virtual void pop() const =0; virtual bool empty() const =0; public: virtual void schedule_absolute(absolute, const schedulable&) const =0; virtual void schedule_relative(relative when, const schedulable& a) const { auto at = add(clock_now, when); return schedule_absolute(at, a); } bool is_enabled() const {return isenabled;} absolute clock() const {return clock_now;} void start() const { if (!isenabled) { isenabled = true; rxsc::recursion r; r.reset(false); while (!empty() && isenabled) { auto next = top(); pop(); if (next.what.is_subscribed()) { if (next.when > clock_now) { clock_now = next.when; } next.what(r.get_recurse()); } } isenabled = false; } } void stop() const { isenabled = false; } void advance_to(absolute time) const { if (time < clock_now) { std::terminate(); } if (time == clock_now) { return; } if (!isenabled) { isenabled = true; rxsc::recursion r; while (!empty() && isenabled) { auto next = top(); if (next.when <= time) { pop(); if (!next.what.is_subscribed()) { continue; } if (next.when > clock_now) { clock_now = next.when; } next.what(r.get_recurse()); } else { break; } } isenabled = false; clock_now = time; } else { std::terminate(); } } void advance_by(relative time) const { auto dt = add(clock_now, time); if (dt < clock_now) { std::terminate(); } if (dt == clock_now) { return; } if (!isenabled) { advance_to(dt); } else { std::terminate(); } } void sleep(relative time) const { auto dt = add(clock_now, time); if (dt < clock_now) { std::terminate(); } clock_now = dt; } }; } template struct virtual_time : public detail::virtual_time_base { typedef detail::virtual_time_base base; typedef typename base::item_type item_type; typedef detail::schedulable_queue< typename item_type::time_point_type> queue_item_time; mutable queue_item_time q; public: virtual ~virtual_time() { } protected: virtual_time() { } explicit virtual_time(typename base::absolute initialClock) : base(initialClock) { } virtual item_type top() const { return q.top(); } virtual void pop() const { q.pop(); } virtual bool empty() const { return q.empty(); } using base::schedule_absolute; using base::schedule_relative; virtual void schedule_absolute(typename base::absolute when, const schedulable& a) const { // use a separate subscription here so that a's subscription is not affected auto run = make_schedulable( a.get_worker(), composite_subscription(), [a](const schedulable& scbl) { rxsc::recursion r; r.reset(false); if (scbl.is_subscribed()) { scbl.unsubscribe(); // unsubscribe() run, not a; a(r.get_recurse()); } }); q.push(item_type(when, run)); } }; } } #endif