282 lines
7.8 KiB
C++
282 lines
7.8 KiB
C++
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
|
|
|
|
#pragma once
|
|
|
|
#if !defined(RXCPP_RX_NOTIFICATION_HPP)
|
|
#define RXCPP_RX_NOTIFICATION_HPP
|
|
|
|
#include "rx-includes.hpp"
|
|
|
|
namespace rxcpp {
|
|
|
|
namespace notifications {
|
|
|
|
class subscription
|
|
{
|
|
long s;
|
|
long u;
|
|
|
|
public:
|
|
explicit inline subscription(long s)
|
|
: s(s), u(std::numeric_limits<long>::max()) {
|
|
}
|
|
inline subscription(long s, long u)
|
|
: s(s), u(u) {
|
|
}
|
|
inline long subscribe() const {
|
|
return s;
|
|
}
|
|
inline long unsubscribe() const {
|
|
return u;
|
|
}
|
|
};
|
|
|
|
inline bool operator == (subscription lhs, subscription rhs) {
|
|
return lhs.subscribe() == rhs.subscribe() && lhs.unsubscribe() == rhs.unsubscribe();
|
|
}
|
|
|
|
inline std::ostream& operator<< (std::ostream& out, const subscription& s) {
|
|
out << s.subscribe() << "-" << s.unsubscribe();
|
|
return out;
|
|
}
|
|
|
|
namespace detail {
|
|
|
|
template<typename T>
|
|
struct notification_base
|
|
: public std::enable_shared_from_this<notification_base<T>>
|
|
{
|
|
typedef subscriber<T> observer_type;
|
|
typedef std::shared_ptr<notification_base<T>> type;
|
|
|
|
virtual ~notification_base() {}
|
|
|
|
virtual void out(std::ostream& out) const =0;
|
|
virtual bool equals(const type& other) const = 0;
|
|
virtual void accept(const observer_type& o) const =0;
|
|
};
|
|
|
|
template<class T>
|
|
std::ostream& operator<< (std::ostream& out, const std::vector<T>& v);
|
|
|
|
template<class T>
|
|
auto to_stream(std::ostream& os, const T& t, int, int)
|
|
-> decltype(os << t) {
|
|
return os << t;
|
|
}
|
|
|
|
#if RXCPP_USE_RTTI
|
|
template<class T>
|
|
std::ostream& to_stream(std::ostream& os, const T&, int, ...) {
|
|
return os << "< " << typeid(T).name() << " does not support ostream>";
|
|
}
|
|
#endif
|
|
|
|
template<class T>
|
|
std::ostream& to_stream(std::ostream& os, const T&, ...) {
|
|
return os << "<the value does not support ostream>";
|
|
}
|
|
|
|
template<class T>
|
|
inline std::ostream& ostreamvector (std::ostream& os, const std::vector<T>& v) {
|
|
os << "[";
|
|
bool doemit = false;
|
|
for(auto& i : v) {
|
|
if (doemit) {
|
|
os << ", ";
|
|
} else {
|
|
doemit = true;
|
|
}
|
|
to_stream(os, i, 0, 0);
|
|
}
|
|
os << "]";
|
|
return os;
|
|
}
|
|
|
|
template<class T>
|
|
inline std::ostream& operator<< (std::ostream& os, const std::vector<T>& v) {
|
|
return ostreamvector(os, v);
|
|
}
|
|
|
|
template<class T>
|
|
auto equals(const T& lhs, const T& rhs, int)
|
|
-> decltype(bool(lhs == rhs)) {
|
|
return lhs == rhs;
|
|
}
|
|
|
|
template<class T>
|
|
bool equals(const T&, const T&, ...) {
|
|
rxu::throw_exception(std::runtime_error("value does not support equality tests"));
|
|
return false;
|
|
}
|
|
|
|
}
|
|
|
|
template<typename T>
|
|
struct notification
|
|
{
|
|
typedef typename detail::notification_base<T>::type type;
|
|
typedef typename detail::notification_base<T>::observer_type observer_type;
|
|
|
|
private:
|
|
typedef detail::notification_base<T> base;
|
|
|
|
struct on_next_notification : public base {
|
|
on_next_notification(T value) : value(std::move(value)) {
|
|
}
|
|
on_next_notification(const on_next_notification& o) : value(o.value) {}
|
|
on_next_notification(const on_next_notification&& o) : value(std::move(o.value)) {}
|
|
on_next_notification& operator=(on_next_notification o) { value = std::move(o.value); return *this; }
|
|
virtual void out(std::ostream& os) const {
|
|
os << "on_next( ";
|
|
detail::to_stream(os, value, 0, 0);
|
|
os << ")";
|
|
}
|
|
virtual bool equals(const typename base::type& other) const {
|
|
bool result = false;
|
|
other->accept(make_subscriber<T>(make_observer_dynamic<T>([this, &result](T v) {
|
|
result = detail::equals(this->value, v, 0);
|
|
})));
|
|
return result;
|
|
}
|
|
virtual void accept(const typename base::observer_type& o) const {
|
|
o.on_next(value);
|
|
}
|
|
const T value;
|
|
};
|
|
|
|
struct on_error_notification : public base {
|
|
on_error_notification(rxu::error_ptr ep) : ep(ep) {
|
|
}
|
|
on_error_notification(const on_error_notification& o) : ep(o.ep) {}
|
|
on_error_notification(const on_error_notification&& o) : ep(std::move(o.ep)) {}
|
|
on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep); return *this; }
|
|
virtual void out(std::ostream& os) const {
|
|
os << "on_error(";
|
|
os << rxu::what(ep);
|
|
os << ")";
|
|
}
|
|
virtual bool equals(const typename base::type& other) const {
|
|
bool result = false;
|
|
// not trying to compare exceptions
|
|
other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](rxu::error_ptr){
|
|
result = true;
|
|
})));
|
|
return result;
|
|
}
|
|
virtual void accept(const typename base::observer_type& o) const {
|
|
o.on_error(ep);
|
|
}
|
|
const rxu::error_ptr ep;
|
|
};
|
|
|
|
struct on_completed_notification : public base {
|
|
on_completed_notification() {
|
|
}
|
|
virtual void out(std::ostream& os) const {
|
|
os << "on_completed()";
|
|
}
|
|
virtual bool equals(const typename base::type& other) const {
|
|
bool result = false;
|
|
other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){
|
|
result = true;
|
|
})));
|
|
return result;
|
|
}
|
|
virtual void accept(const typename base::observer_type& o) const {
|
|
o.on_completed();
|
|
}
|
|
};
|
|
|
|
struct exception_tag {};
|
|
|
|
template<typename Exception>
|
|
static
|
|
type make_on_error(exception_tag&&, Exception&& e) {
|
|
rxu::error_ptr ep = rxu::make_error_ptr(std::forward<Exception>(e));
|
|
return std::make_shared<on_error_notification>(ep);
|
|
}
|
|
|
|
struct exception_ptr_tag {};
|
|
|
|
static
|
|
type make_on_error(exception_ptr_tag&&, rxu::error_ptr ep) {
|
|
return std::make_shared<on_error_notification>(ep);
|
|
}
|
|
|
|
public:
|
|
template<typename U>
|
|
static type on_next(U value) {
|
|
return std::make_shared<on_next_notification>(std::move(value));
|
|
}
|
|
|
|
static type on_completed() {
|
|
return std::make_shared<on_completed_notification>();
|
|
}
|
|
|
|
template<typename Exception>
|
|
static type on_error(Exception&& e) {
|
|
return make_on_error(typename std::conditional<
|
|
std::is_same<rxu::decay_t<Exception>, rxu::error_ptr>::value,
|
|
exception_ptr_tag, exception_tag>::type(),
|
|
std::forward<Exception>(e));
|
|
}
|
|
};
|
|
|
|
template<class T>
|
|
bool operator == (const std::shared_ptr<detail::notification_base<T>>& lhs, const std::shared_ptr<detail::notification_base<T>>& rhs) {
|
|
if (!lhs && !rhs) {return true;}
|
|
if (!lhs || !rhs) {return false;}
|
|
return lhs->equals(rhs);
|
|
}
|
|
|
|
template<class T>
|
|
std::ostream& operator<< (std::ostream& os, const std::shared_ptr<detail::notification_base<T>>& n) {
|
|
n->out(os);
|
|
return os;
|
|
}
|
|
|
|
|
|
template<class T>
|
|
class recorded
|
|
{
|
|
long t;
|
|
T v;
|
|
public:
|
|
recorded(long t, T v)
|
|
: t(t), v(v) {
|
|
}
|
|
long time() const {
|
|
return t;
|
|
}
|
|
const T& value() const {
|
|
return v;
|
|
}
|
|
};
|
|
|
|
template<class T>
|
|
bool operator == (recorded<T> lhs, recorded<T> rhs) {
|
|
return lhs.time() == rhs.time() && lhs.value() == rhs.value();
|
|
}
|
|
|
|
template<class T>
|
|
std::ostream& operator<< (std::ostream& out, const recorded<T>& r) {
|
|
out << "@" << r.time() << "-" << r.value();
|
|
return out;
|
|
}
|
|
|
|
}
|
|
namespace rxn=notifications;
|
|
|
|
inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::subscription>& vs) {
|
|
return rxcpp::notifications::detail::ostreamvector(out, vs);
|
|
}
|
|
template<class T>
|
|
inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::recorded<T>>& vr) {
|
|
return rxcpp::notifications::detail::ostreamvector(out, vr);
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|