278 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			278 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 | 
						|
 | 
						|
#pragma once
 | 
						|
 | 
						|
#if !defined(RXCPP_RX_SUBJECT_HPP)
 | 
						|
#define RXCPP_RX_SUBJECT_HPP
 | 
						|
 | 
						|
#include "../rx-includes.hpp"
 | 
						|
 | 
						|
namespace rxcpp {
 | 
						|
 | 
						|
namespace subjects {
 | 
						|
 | 
						|
namespace detail {
 | 
						|
 | 
						|
template<class T>
 | 
						|
class multicast_observer
 | 
						|
{
 | 
						|
    typedef subscriber<T> observer_type;
 | 
						|
    typedef std::vector<observer_type> list_type;
 | 
						|
 | 
						|
    struct mode
 | 
						|
    {
 | 
						|
        enum type {
 | 
						|
            Invalid = 0,
 | 
						|
            Casting,
 | 
						|
            Disposed,
 | 
						|
            Completed,
 | 
						|
            Errored
 | 
						|
        };
 | 
						|
    };
 | 
						|
 | 
						|
    struct state_type
 | 
						|
        : public std::enable_shared_from_this<state_type>
 | 
						|
    {
 | 
						|
        explicit state_type(composite_subscription cs)
 | 
						|
            : current(mode::Casting)
 | 
						|
            , lifetime(cs)
 | 
						|
        {
 | 
						|
        }
 | 
						|
        std::mutex lock;
 | 
						|
        typename mode::type current;
 | 
						|
        rxu::error_ptr error;
 | 
						|
        composite_subscription lifetime;
 | 
						|
    };
 | 
						|
 | 
						|
    struct completer_type
 | 
						|
        : public std::enable_shared_from_this<completer_type>
 | 
						|
    {
 | 
						|
        ~completer_type()
 | 
						|
        {
 | 
						|
        }
 | 
						|
        completer_type(std::shared_ptr<state_type> s, const std::shared_ptr<completer_type>& old, observer_type o)
 | 
						|
            : state(s)
 | 
						|
        {
 | 
						|
            retain(old);
 | 
						|
            observers.push_back(o);
 | 
						|
        }
 | 
						|
        completer_type(std::shared_ptr<state_type> s, const std::shared_ptr<completer_type>& old)
 | 
						|
            : state(s)
 | 
						|
        {
 | 
						|
            retain(old);
 | 
						|
        }
 | 
						|
        void retain(const std::shared_ptr<completer_type>& old) {
 | 
						|
            if (old) {
 | 
						|
                observers.reserve(old->observers.size() + 1);
 | 
						|
                std::copy_if(
 | 
						|
                    old->observers.begin(), old->observers.end(),
 | 
						|
                    std::inserter(observers, observers.end()),
 | 
						|
                    [](const observer_type& o){
 | 
						|
                        return o.is_subscribed();
 | 
						|
                    });
 | 
						|
            }
 | 
						|
        }
 | 
						|
        std::shared_ptr<state_type> state;
 | 
						|
        list_type observers;
 | 
						|
    };
 | 
						|
 | 
						|
    // this type prevents a circular ref between state and completer
 | 
						|
    struct binder_type
 | 
						|
        : public std::enable_shared_from_this<binder_type>
 | 
						|
    {
 | 
						|
        explicit binder_type(composite_subscription cs)
 | 
						|
            : state(std::make_shared<state_type>(cs))
 | 
						|
            , id(trace_id::make_next_id_subscriber())
 | 
						|
        {
 | 
						|
        }
 | 
						|
 | 
						|
        std::shared_ptr<state_type> state;
 | 
						|
 | 
						|
        trace_id id;
 | 
						|
 | 
						|
        // used to avoid taking lock in on_next
 | 
						|
        mutable std::weak_ptr<completer_type> current_completer;
 | 
						|
 | 
						|
        // must only be accessed under state->lock
 | 
						|
        mutable std::shared_ptr<completer_type> completer;
 | 
						|
    };
 | 
						|
 | 
						|
    std::shared_ptr<binder_type> b;
 | 
						|
 | 
						|
public:
 | 
						|
    typedef subscriber<T, observer<T, detail::multicast_observer<T>>> input_subscriber_type;
 | 
						|
 | 
						|
    explicit multicast_observer(composite_subscription cs)
 | 
						|
        : b(std::make_shared<binder_type>(cs))
 | 
						|
    {
 | 
						|
        std::weak_ptr<binder_type> binder = b;
 | 
						|
        b->state->lifetime.add([binder](){
 | 
						|
            auto b = binder.lock();
 | 
						|
            if (b && b->state->current == mode::Casting){
 | 
						|
                b->state->current = mode::Disposed;
 | 
						|
                b->current_completer.reset();
 | 
						|
                b->completer.reset();
 | 
						|
            }
 | 
						|
        });
 | 
						|
    }
 | 
						|
    trace_id get_id() const {
 | 
						|
        return b->id;
 | 
						|
    }
 | 
						|
    composite_subscription get_subscription() const {
 | 
						|
        return b->state->lifetime;
 | 
						|
    }
 | 
						|
    input_subscriber_type get_subscriber() const {
 | 
						|
        return make_subscriber<T>(get_id(), get_subscription(), observer<T, detail::multicast_observer<T>>(*this));
 | 
						|
    }
 | 
						|
    bool has_observers() const {
 | 
						|
        std::unique_lock<std::mutex> guard(b->state->lock);
 | 
						|
        return b->completer && !b->completer->observers.empty();
 | 
						|
    }
 | 
						|
    template<class SubscriberFrom>
 | 
						|
    void add(const SubscriberFrom& sf, observer_type o) const {
 | 
						|
        trace_activity().connect(sf, o);
 | 
						|
        std::unique_lock<std::mutex> guard(b->state->lock);
 | 
						|
        switch (b->state->current) {
 | 
						|
        case mode::Casting:
 | 
						|
            {
 | 
						|
                if (o.is_subscribed()) {
 | 
						|
                    std::weak_ptr<binder_type> binder = b;
 | 
						|
                    o.add([=](){
 | 
						|
                        auto b = binder.lock();
 | 
						|
                        if (b) {
 | 
						|
                            std::unique_lock<std::mutex> guard(b->state->lock);
 | 
						|
                            b->completer = std::make_shared<completer_type>(b->state, b->completer);
 | 
						|
                        }
 | 
						|
                    });
 | 
						|
                    b->completer = std::make_shared<completer_type>(b->state, b->completer, o);
 | 
						|
                }
 | 
						|
            }
 | 
						|
            break;
 | 
						|
        case mode::Completed:
 | 
						|
            {
 | 
						|
                guard.unlock();
 | 
						|
                o.on_completed();
 | 
						|
                return;
 | 
						|
            }
 | 
						|
            break;
 | 
						|
        case mode::Errored:
 | 
						|
            {
 | 
						|
                auto e = b->state->error;
 | 
						|
                guard.unlock();
 | 
						|
                o.on_error(e);
 | 
						|
                return;
 | 
						|
            }
 | 
						|
            break;
 | 
						|
        case mode::Disposed:
 | 
						|
            {
 | 
						|
                guard.unlock();
 | 
						|
                o.unsubscribe();
 | 
						|
                return;
 | 
						|
            }
 | 
						|
            break;
 | 
						|
        default:
 | 
						|
            std::terminate();
 | 
						|
        }
 | 
						|
    }
 | 
						|
    template<class V>
 | 
						|
    void on_next(V v) const {
 | 
						|
        auto current_completer = b->current_completer.lock();
 | 
						|
        if (!current_completer) {
 | 
						|
            std::unique_lock<std::mutex> guard(b->state->lock);
 | 
						|
            b->current_completer = b->completer;
 | 
						|
            current_completer = b->current_completer.lock();
 | 
						|
        }
 | 
						|
        if (!current_completer || current_completer->observers.empty()) {
 | 
						|
            return;
 | 
						|
        }
 | 
						|
        for (auto& o : current_completer->observers) {
 | 
						|
            if (o.is_subscribed()) {
 | 
						|
                o.on_next(v);
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
    void on_error(rxu::error_ptr e) const {
 | 
						|
        std::unique_lock<std::mutex> guard(b->state->lock);
 | 
						|
        if (b->state->current == mode::Casting) {
 | 
						|
            b->state->error = e;
 | 
						|
            b->state->current = mode::Errored;
 | 
						|
            auto s = b->state->lifetime;
 | 
						|
            auto c = std::move(b->completer);
 | 
						|
            b->current_completer.reset();
 | 
						|
            guard.unlock();
 | 
						|
            if (c) {
 | 
						|
                for (auto& o : c->observers) {
 | 
						|
                    if (o.is_subscribed()) {
 | 
						|
                        o.on_error(e);
 | 
						|
                    }
 | 
						|
                }
 | 
						|
            }
 | 
						|
            s.unsubscribe();
 | 
						|
        }
 | 
						|
    }
 | 
						|
    void on_completed() const {
 | 
						|
        std::unique_lock<std::mutex> guard(b->state->lock);
 | 
						|
        if (b->state->current == mode::Casting) {
 | 
						|
            b->state->current = mode::Completed;
 | 
						|
            auto s = b->state->lifetime;
 | 
						|
            auto c = std::move(b->completer);
 | 
						|
            b->current_completer.reset();
 | 
						|
            guard.unlock();
 | 
						|
            if (c) {
 | 
						|
                for (auto& o : c->observers) {
 | 
						|
                    if (o.is_subscribed()) {
 | 
						|
                        o.on_completed();
 | 
						|
                    }
 | 
						|
                }
 | 
						|
            }
 | 
						|
            s.unsubscribe();
 | 
						|
        }
 | 
						|
    }
 | 
						|
};
 | 
						|
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
template<class T>
 | 
						|
class subject
 | 
						|
{
 | 
						|
    detail::multicast_observer<T> s;
 | 
						|
 | 
						|
public:
 | 
						|
    typedef subscriber<T, observer<T, detail::multicast_observer<T>>> subscriber_type;
 | 
						|
    typedef observable<T> observable_type;
 | 
						|
    subject()
 | 
						|
        : s(composite_subscription())
 | 
						|
    {
 | 
						|
    }
 | 
						|
    explicit subject(composite_subscription cs)
 | 
						|
        : s(cs)
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    bool has_observers() const {
 | 
						|
        return s.has_observers();
 | 
						|
    }
 | 
						|
 | 
						|
    composite_subscription get_subscription() const {
 | 
						|
        return s.get_subscription();
 | 
						|
    }
 | 
						|
 | 
						|
    subscriber_type get_subscriber() const {
 | 
						|
        return s.get_subscriber();
 | 
						|
    }
 | 
						|
 | 
						|
    observable<T> get_observable() const {
 | 
						|
        auto keepAlive = s;
 | 
						|
        return make_observable_dynamic<T>([=](subscriber<T> o){
 | 
						|
            keepAlive.add(keepAlive.get_subscriber(), std::move(o));
 | 
						|
        });
 | 
						|
    }
 | 
						|
};
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
#endif
 |