考虑一个阻塞函数: this_thread::sleep_for(milliseconds(3000));
我试图获得以下行为:
Trigger Blocking Function
|---------------------------------------------X
Run Code Online (Sandbox Code Playgroud)
我想触发阻塞功能,如果时间太长(超过两秒),它应该超时。
我做了以下事情:
my_connection = observable<>::create<int>([](subscriber<int> s) {
auto s2 = observable<>::just(1, observe_on_new_thread()) |
subscribe<int>([&](auto x) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
});
}) |
timeout(seconds(2), observe_on_new_thread());
Run Code Online (Sandbox Code Playgroud)
我无法让它发挥作用。对于初学者来说,我认为 s 不能从不同的线程 on_next 。
所以我的问题是,正确的反应方式是什么?如何在 rxcpp 中包装阻塞函数并为其添加超时?
随后,我想要获得一个行为如下的 RX 流:
Trigger Cleanup
|------------------------X
(Delay) Trigger Cleanup
|-----------------X
Run Code Online (Sandbox Code Playgroud)
好问题!上面的内容已经很接近了。
下面是如何使阻塞操作适应 rxcpp 的示例。它通过libcurl 轮询来发出 http 请求。
以下内容应该符合您的预期。
auto sharedThreads = observe_on_event_loop();
auto my_connection = observable<>::create<int>([](subscriber<int> s) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
s.on_completed();
}) |
subscribe_on(observe_on_new_thread()) |
//start_with(0) | // workaround bug in timeout
timeout(seconds(2), sharedThreads);
//skip(1); // workaround bug in timeout
my_connection.as_blocking().subscribe(
[](int){},
[](exception_ptr ep){cout << "timed out" << endl;}
);
Run Code Online (Sandbox Code Playgroud)
subscribe_on将在专用线程上运行create,因此create允许阻塞该线程。timeout将在可以与其他线程共享的不同线程上运行计时器,并将所有on_next//调用转移到同一线程on_error。on_completedas_blocking将确保subscribe在完成之前不会返回。这仅用于防止main()退出 - 最常见于测试或示例程序。编辑:添加了错误的解决方法timeout。目前,在第一个值到达之前,它不会安排第一次超时。
EDIT-2:timeout错误已修复,不再需要解决方法。
| 归档时间: |
|
| 查看次数: |
886 次 |
| 最近记录: |