创建一个可以在 RxCpp 中取消订阅的 Observable

Fal*_*nwe 3 c# c++ system.reactive rxcpp

我正在从 C# 移植一些严重依赖 Rx 的代码,而且我很难找到一些最常用的 C# 方法的 C++ 等价物。

特别是,我想从订阅/取消订阅逻辑创建一个 observable。在 C# 中,我使用Observable.Create<TSource> Method (Func<IObserver<TSource>, Action>)覆盖来创建一个 observable。例如

var observable = Observable.Create<int>(observer =>
{
    observers.Add(observer);
    return () =>
    {
        observers.Remove(observer)
    };
});
Run Code Online (Sandbox Code Playgroud)

是否可以用RxCpp做同样的事情?我认为答案在于rx::observable<>::create(OnSubscribe os)方法,但我不知道如何使用它来“注册”取消订阅的 lambda。

Kir*_*oop 5

在 RxCpp 和 RxJava 中, .subscribe() 需要一个订阅者。订阅者是绑定在一起的订阅者和观察者。

在 RxCpp 中,您的示例可能如下所示:

std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers(new std::list<rxcpp::subscriber<int>>());

auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){
    auto it = observers->insert(observers->end(), out);
    it->add([=](){
        observers->erase(it);
    });
});
Run Code Online (Sandbox Code Playgroud)

注意:rxcpp::subscriber<int>是一个隐藏观察者类型的类型忘记器。这允许它存储在一个集合中,但为 on_next、on_error 和 on_completed 引入了虚函数。