// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_SUBJECT_HPP) #define RXCPP_RX_SUBJECT_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace subjects { namespace detail { template class multicast_observer { typedef subscriber observer_type; typedef std::vector list_type; struct mode { enum type { Invalid = 0, Casting, Disposed, Completed, Errored }; }; struct state_type : public std::enable_shared_from_this { explicit state_type(composite_subscription cs) : current(mode::Casting) , lifetime(cs) { } std::mutex lock; typename mode::type current; rxu::error_ptr error; composite_subscription lifetime; }; struct completer_type : public std::enable_shared_from_this { ~completer_type() { } completer_type(std::shared_ptr s, const std::shared_ptr& old, observer_type o) : state(s) { retain(old); observers.push_back(o); } completer_type(std::shared_ptr s, const std::shared_ptr& old) : state(s) { retain(old); } void retain(const std::shared_ptr& old) { if (old) { observers.reserve(old->observers.size() + 1); std::copy_if( old->observers.begin(), old->observers.end(), std::inserter(observers, observers.end()), [](const observer_type& o){ return o.is_subscribed(); }); } } std::shared_ptr state; list_type observers; }; // this type prevents a circular ref between state and completer struct binder_type : public std::enable_shared_from_this { explicit binder_type(composite_subscription cs) : state(std::make_shared(cs)) , id(trace_id::make_next_id_subscriber()) { } std::shared_ptr state; trace_id id; // used to avoid taking lock in on_next mutable std::weak_ptr current_completer; // must only be accessed under state->lock mutable std::shared_ptr completer; }; std::shared_ptr b; public: typedef subscriber>> input_subscriber_type; explicit multicast_observer(composite_subscription cs) : b(std::make_shared(cs)) { std::weak_ptr binder = b; b->state->lifetime.add([binder](){ auto b = binder.lock(); if (b && b->state->current == mode::Casting){ b->state->current = mode::Disposed; b->current_completer.reset(); b->completer.reset(); } }); } trace_id get_id() const { return b->id; } composite_subscription get_subscription() const { return b->state->lifetime; } input_subscriber_type get_subscriber() const { return make_subscriber(get_id(), get_subscription(), observer>(*this)); } bool has_observers() const { std::unique_lock guard(b->state->lock); return b->completer && !b->completer->observers.empty(); } template void add(const SubscriberFrom& sf, observer_type o) const { trace_activity().connect(sf, o); std::unique_lock guard(b->state->lock); switch (b->state->current) { case mode::Casting: { if (o.is_subscribed()) { std::weak_ptr binder = b; o.add([=](){ auto b = binder.lock(); if (b) { std::unique_lock guard(b->state->lock); b->completer = std::make_shared(b->state, b->completer); } }); b->completer = std::make_shared(b->state, b->completer, o); } } break; case mode::Completed: { guard.unlock(); o.on_completed(); return; } break; case mode::Errored: { auto e = b->state->error; guard.unlock(); o.on_error(e); return; } break; case mode::Disposed: { guard.unlock(); o.unsubscribe(); return; } break; default: std::terminate(); } } template void on_next(V v) const { auto current_completer = b->current_completer.lock(); if (!current_completer) { std::unique_lock guard(b->state->lock); b->current_completer = b->completer; current_completer = b->current_completer.lock(); } if (!current_completer || current_completer->observers.empty()) { return; } for (auto& o : current_completer->observers) { if (o.is_subscribed()) { o.on_next(v); } } } void on_error(rxu::error_ptr e) const { std::unique_lock guard(b->state->lock); if (b->state->current == mode::Casting) { b->state->error = e; b->state->current = mode::Errored; auto s = b->state->lifetime; auto c = std::move(b->completer); b->current_completer.reset(); guard.unlock(); if (c) { for (auto& o : c->observers) { if (o.is_subscribed()) { o.on_error(e); } } } s.unsubscribe(); } } void on_completed() const { std::unique_lock guard(b->state->lock); if (b->state->current == mode::Casting) { b->state->current = mode::Completed; auto s = b->state->lifetime; auto c = std::move(b->completer); b->current_completer.reset(); guard.unlock(); if (c) { for (auto& o : c->observers) { if (o.is_subscribed()) { o.on_completed(); } } } s.unsubscribe(); } } }; } template class subject { detail::multicast_observer s; public: typedef subscriber>> subscriber_type; typedef observable observable_type; subject() : s(composite_subscription()) { } explicit subject(composite_subscription cs) : s(cs) { } bool has_observers() const { return s.has_observers(); } composite_subscription get_subscription() const { return s.get_subscription(); } subscriber_type get_subscriber() const { return s.get_subscriber(); } observable get_observable() const { auto keepAlive = s; return make_observable_dynamic([=](subscriber o){ keepAlive.add(keepAlive.get_subscriber(), std::move(o)); }); } }; } } #endif