标签: rxcpp

Rxcpp中的调度程序

我试图找出Cx 版本的Rx中的调度模型.

了解C#版本,其中有一个带有一个Schedule方法的简单接口; C++版本似乎相当复杂,包括调度程序,工作程序和协调等.

对我来说,一个主要缺失的部分是线程池调度程序的实现,是否存在其他名称?我将如何实现自己?我应该把它写在PPL(Windows)上面吗?如果我需要一个序列化(类似于Actor)的观察者,我应该使用什么?偷看这里这里可以表明这不是一项微不足道的任务.

这将有助于获得关于该主题的某种概述,因为官方文档是自动生成的并且仍然非常稀疏.

c++ threadpool system.reactive ppl rxcpp

10
推荐指数
1
解决办法
2059
查看次数

llvm错误:搬迁尚未实施!在orcjit或lli中运行RxCpp时

我想在llvm的IR解释器中运行RxCpp示例lli.

不幸的是,在lli中运行任何RxCpp示例都失败了:

git clone https://github.com/Reactive-Extensions/RxCpp.git --depth 1
cd RxCpp/Rx/v2/examples/pythogerian
clang++ -S -emit-llvm -fno-use-cxa-exit -I../../src main.cpp 
lli main.ll
Run Code Online (Sandbox Code Playgroud)

错误信息:

Relocation type not implemented yet!
UNREACHABLE executed at llvm/lib/ExecutionEngine/RuntimeDyld/RuntimeDyldELF.cpp:232!
Run Code Online (Sandbox Code Playgroud)

问题:

这个错误究竟意味着什么? - llvm的orc-jit中哪些假设不满足?

有解决方法吗? - 我可以应用任何LLVM-IR转换来实现这项工作(例如通过编译器标志)吗?

RxCpp使用哪些特殊功能会导致llvm的orcjit出现此问题?

测试:

clang version 5.0.0 (https://github.com/llvm-mirror/clang.git 6c9e299494de2a5b0425e46bc937f29a05128252) 
clang version 4.0.0-+rc1-1 (tags/RELEASE_400/rc1)
clang version 3.9.0-1 (tags/RELEASE_390/final)
clang version 3.8.1-12 (tags/RELEASE_381/final)
Target: x86_64-pc-linux-gnu
Thread model: posix
InstalledDir: /usr/bin
Run Code Online (Sandbox Code Playgroud)

c++ llvm clang rxcpp lli

7
推荐指数
1
解决办法
278
查看次数

RxCpp:观察者的生命周期,如果使用observe_on(rxcpp::observe_on_new_thread())

如果观察者使用observe_on(rxcpp::observe_on_new_thread()),等待所有观察者on_completed被调用的正确方法是什么:

例如:

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        // ...
        s.on_completed();
    };
    auto values = rxcpp::observable<>::create<int>(generator).publish();
    auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
                    .subscribe([&](int) { slow_function(foo); }));

    auto lifetime = rxcpp::composite_subscription();
    lifetime.add([&](){ wrapper.log("unsubscribe");  });
    auto s2 = values.ref_count().as_blocking().subscribe(lifetime);

    // hope to call something here to wait for the completion of
    // s1's on_completed function
}

// the program usually crashes here when foo goes out of scope because 
// the slow_function(foo) is still working on foo.  I also noticed …
Run Code Online (Sandbox Code Playgroud)

c++ reactive-programming rxcpp

5
推荐指数
1
解决办法
1749
查看次数

rxcpp - 当可观察对象发出值时,为什么不调用所有观察者的 on_next 函数

我试图了解如何使用 rxcpp,我的印象是,当可观察者发出一个值时,所有订阅的观察者都会通过调用其 on_next() 方法来获得通知,并向他们传递发出的值。

以下示例的情况并非如此:

auto eventloop = rxcpp::observe_on_event_loop();

printf("Start task\n");

auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
        [](int i){
            printf("Observable sending: %d\n", i);
            return i;
        }
);

values.
    subscribe_on(eventloop).
    take(2).
    as_blocking().
    subscribe(
        [](int v){printf("#1 onNext: %d\n", v);},
        [](){printf("#1 onCompleted\n");});

values.
    subscribe_on(eventloop).
    take(2).
    as_blocking().
    subscribe(
        [](int v){printf("#2 onNext: %d\n", v);},
        [](){printf("#2 onCompleted\n");});

printf("Finish task\n");
Run Code Online (Sandbox Code Playgroud)

我期望输出是这样的:

Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task
Run Code Online (Sandbox Code Playgroud)

即当新值到来时,所有订阅的观察者都会调用 on_next …

c++ reactive-programming reactivex rxcpp

5
推荐指数
1
解决办法
794
查看次数

RXCPP:阻塞功能超时

考虑一个阻塞函数: this_thread::sleep_for(milliseconds(3000));

我试图获得以下行为:

Trigger Blocking Function               

|---------------------------------------------X
Run Code Online (Sandbox Code Playgroud)

我想触发阻塞功能,如果时间太长(超过两秒),它应该超时。

我做了以下事情:

my_connection = observable<>::create<int>([](subscriber<int> s) {
    auto s2 = observable<>::just(1, observe_on_new_thread()) |
    subscribe<int>([&](auto x) {
        this_thread::sleep_for(milliseconds(3000));
        s.on_next(1);
    });
}) |
timeout(seconds(2), observe_on_new_thread());
Run Code Online (Sandbox Code Playgroud)

我无法让它发挥作用。对于初学者来说,我认为 s 不能从不同的线程 on_next 。

所以我的问题是,正确的反应方式是什么?如何在 rxcpp 中包装阻塞函数并为其添加超时?

随后,我想要获得一个行为如下的 RX 流:

Trigger                Cleanup

|------------------------X
                           (Delay)   Trigger           Cleanup
                                       |-----------------X
Run Code Online (Sandbox Code Playgroud)

c++11 rxcpp

5
推荐指数
1
解决办法
886
查看次数

RxCpp RAII 可观察订阅

我在模型视图设置中使用 RxCpp。视图更新方法订阅可观察对象(通过 lambda 捕获this)。如果订阅的寿命比视图实例的寿命长,就会发生未定义的内存访问。我不希望订阅使视图保持活动状态。因此,我需要订阅在视图的析构函数上确定地结束。这听起来像是 RAII 的一个例子。

这有危险吗?这是否是对 rx 的滥用?我读过的书更喜欢take_until类似的环境。为什么这样会更好,以及如何在这里使用它?

谢谢你!

#include "rxcpp/rx.hpp"

class MyView : public View
{
public:
    MyView(rxcpp::observable<int> obs) : obs (obs)
    {
        sub = obs.subscribe ([this] (int i) { update(i); });
    }
    ~MyView()
    {
        sub.unsubscribe();
    }
    void update(int i)
    {
        number = i;
        repaint();
    }
private:
    rxcpp::observable<int> obs;
    rxcpp::subscription sub;
    int number;
};
Run Code Online (Sandbox Code Playgroud)

c++ raii model-view rxcpp

5
推荐指数
1
解决办法
480
查看次数

C++ 中的空尖括号

在探索 RxCpp 库时,我遇到了以下我无法解释的示例。

    auto ints = rxcpp::observable<>::create(
        [](rxcpp::subscriber<int> s){
            s.on_next(1);
            s.on_next(2);
            s.on_completed();
    });
Run Code Online (Sandbox Code Playgroud)

库中有两个observable类的声明:

template<class T, class SourceOperator>
class observable
    : public observable_base<T>
{
// ...
};

template<>
class observable<void, void>
{
// ...
};

Run Code Online (Sandbox Code Playgroud)

我无法理解的是编译器如何设法接受rxcpp::observable<>.一块。有可能是许多明确的专业化observable针对不同类型,比其他void,void

问题是编译器如何解释这段代码中的空尖括号: rxcpp::observable<>.

我在observable类中没有看到默认模板参数,也没有可以解释这一点的可变参数模板参数。

然后我认为它与显式模板专业化有某种关系,并试图在一个孤立的程序中重现它,例如像这样

namespace isolated {
  template<class T>
  class Test {
  public:
    static void say() {
      cout << "I am generic" << endl;
    }
  };

  template<>
  class Test<int> {
  public:
    static void say() …
Run Code Online (Sandbox Code Playgroud)

c++ rxcpp

4
推荐指数
1
解决办法
108
查看次数

创建一个可以在 RxCpp 中取消订阅的 Observable

我正在从 C# 移植一些严重依赖 Rx 的代码,而且我很难找到一些最常用的 C# 方法的 C++ 等价物。

特别是,我想从订阅/取消订阅逻辑创建一个 observable。在 C# 中,我使用Observable.Create<TSource> Method (Func<IObserver<TSource>, Action>)覆盖来创建一个 observable。例如

var observable = Observable.Create<int>(observer =>
{
    observers.Add(observer);
    return () =>
    {
        observers.Remove(observer)
    };
});
Run Code Online (Sandbox Code Playgroud)

是否可以用RxCpp做同样的事情?我认为答案在于rx::observable<>::create(OnSubscribe os)方法,但我不知道如何使用它来“注册”取消订阅的 lambda。

c# c++ system.reactive rxcpp

3
推荐指数
1
解决办法
2079
查看次数

rxcpp:嵌套while循环或类似程序的"经典"命令式结构

我有一个设备可以传输一些事件.我想使用反应式扩展来模拟以下行为:

  1. 检测用户何时连接加密狗(我的程序检查加密狗连接的事件).
  2. 连接加密狗,开始从加密狗捕获数据流.
  3. 能够检测到加密狗何时断开连接并返回1.,如果用户再次连接加密狗,我想转到2.如果键盘被击中我流数据的状态,则程序结束.

我知道如何等待加密狗连接(1.):

auto waitForDongle = events.take_while([](auto const & event) {
      return event == DongleConnected
      }).subscribe([](auto) {});
Run Code Online (Sandbox Code Playgroud)

我知道如何捕获流(2.):

auto streamMotionData = events.take_while([](auto const &) { return !keyboardPressed(); })
    .map([](auto const & evt) -> boost::optional<std::vector<double>> {
            ...
            return data;
        }).subscribe([](vector<double> const &) { ...});
Run Code Online (Sandbox Code Playgroud)

我的问题是我不知道如何组合流以便回到1.以及之后2.我只知道如何一次又一次地执行此操作.但我想要上述行为.

c++ reactive-programming rxcpp

3
推荐指数
1
解决办法
202
查看次数