我想在llvm的IR解释器中运行RxCpp示例lli.
不幸的是,在lli中运行任何RxCpp示例都失败了:
git clone https://github.com/Reactive-Extensions/RxCpp.git --depth 1
cd RxCpp/Rx/v2/examples/pythogerian
clang++ -S -emit-llvm -fno-use-cxa-exit -I../../src main.cpp
lli main.ll
Run Code Online (Sandbox Code Playgroud)
错误信息:
Relocation type not implemented yet!
UNREACHABLE executed at llvm/lib/ExecutionEngine/RuntimeDyld/RuntimeDyldELF.cpp:232!
Run Code Online (Sandbox Code Playgroud)
问题:
这个错误究竟意味着什么? - llvm的orc-jit中哪些假设不满足?
有解决方法吗? - 我可以应用任何LLVM-IR转换来实现这项工作(例如通过编译器标志)吗?
RxCpp使用哪些特殊功能会导致llvm的orcjit出现此问题?
测试:
clang version 5.0.0 (https://github.com/llvm-mirror/clang.git 6c9e299494de2a5b0425e46bc937f29a05128252)
clang version 4.0.0-+rc1-1 (tags/RELEASE_400/rc1)
clang version 3.9.0-1 (tags/RELEASE_390/final)
clang version 3.8.1-12 (tags/RELEASE_381/final)
Target: x86_64-pc-linux-gnu
Thread model: posix
InstalledDir: /usr/bin
Run Code Online (Sandbox Code Playgroud) 如果观察者使用observe_on(rxcpp::observe_on_new_thread()),等待所有观察者on_completed被调用的正确方法是什么:
例如:
{
Foo foo;
auto generator = [&](rxcpp::subscriber<int> s)
{
s.on_next(1);
// ...
s.on_completed();
};
auto values = rxcpp::observable<>::create<int>(generator).publish();
auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
.subscribe([&](int) { slow_function(foo); }));
auto lifetime = rxcpp::composite_subscription();
lifetime.add([&](){ wrapper.log("unsubscribe"); });
auto s2 = values.ref_count().as_blocking().subscribe(lifetime);
// hope to call something here to wait for the completion of
// s1's on_completed function
}
// the program usually crashes here when foo goes out of scope because
// the slow_function(foo) is still working on foo. I also noticed …Run Code Online (Sandbox Code Playgroud) 我试图了解如何使用 rxcpp,我的印象是,当可观察者发出一个值时,所有订阅的观察者都会通过调用其 on_next() 方法来获得通知,并向他们传递发出的值。
以下示例的情况并非如此:
auto eventloop = rxcpp::observe_on_event_loop();
printf("Start task\n");
auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
[](int i){
printf("Observable sending: %d\n", i);
return i;
}
);
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#1 onNext: %d\n", v);},
[](){printf("#1 onCompleted\n");});
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#2 onNext: %d\n", v);},
[](){printf("#2 onCompleted\n");});
printf("Finish task\n");
Run Code Online (Sandbox Code Playgroud)
我期望输出是这样的:
Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task
Run Code Online (Sandbox Code Playgroud)
即当新值到来时,所有订阅的观察者都会调用 on_next …
考虑一个阻塞函数: this_thread::sleep_for(milliseconds(3000));
我试图获得以下行为:
Trigger Blocking Function
|---------------------------------------------X
Run Code Online (Sandbox Code Playgroud)
我想触发阻塞功能,如果时间太长(超过两秒),它应该超时。
我做了以下事情:
my_connection = observable<>::create<int>([](subscriber<int> s) {
auto s2 = observable<>::just(1, observe_on_new_thread()) |
subscribe<int>([&](auto x) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
});
}) |
timeout(seconds(2), observe_on_new_thread());
Run Code Online (Sandbox Code Playgroud)
我无法让它发挥作用。对于初学者来说,我认为 s 不能从不同的线程 on_next 。
所以我的问题是,正确的反应方式是什么?如何在 rxcpp 中包装阻塞函数并为其添加超时?
随后,我想要获得一个行为如下的 RX 流:
Trigger Cleanup
|------------------------X
(Delay) Trigger Cleanup
|-----------------X
Run Code Online (Sandbox Code Playgroud) 我在模型视图设置中使用 RxCpp。视图更新方法订阅可观察对象(通过 lambda 捕获this)。如果订阅的寿命比视图实例的寿命长,就会发生未定义的内存访问。我不希望订阅使视图保持活动状态。因此,我需要订阅在视图的析构函数上确定地结束。这听起来像是 RAII 的一个例子。
这有危险吗?这是否是对 rx 的滥用?我读过的书更喜欢take_until类似的环境。为什么这样会更好,以及如何在这里使用它?
谢谢你!
#include "rxcpp/rx.hpp"
class MyView : public View
{
public:
MyView(rxcpp::observable<int> obs) : obs (obs)
{
sub = obs.subscribe ([this] (int i) { update(i); });
}
~MyView()
{
sub.unsubscribe();
}
void update(int i)
{
number = i;
repaint();
}
private:
rxcpp::observable<int> obs;
rxcpp::subscription sub;
int number;
};
Run Code Online (Sandbox Code Playgroud) 在探索 RxCpp 库时,我遇到了以下我无法解释的示例。
auto ints = rxcpp::observable<>::create(
[](rxcpp::subscriber<int> s){
s.on_next(1);
s.on_next(2);
s.on_completed();
});
Run Code Online (Sandbox Code Playgroud)
库中有两个observable类的声明:
template<class T, class SourceOperator>
class observable
: public observable_base<T>
{
// ...
};
template<>
class observable<void, void>
{
// ...
};
Run Code Online (Sandbox Code Playgroud)
我无法理解的是编译器如何设法接受rxcpp::observable<>.一块。有可能是许多明确的专业化observable针对不同类型,比其他void,void。
问题是编译器如何解释这段代码中的空尖括号: rxcpp::observable<>.
我在observable类中没有看到默认模板参数,也没有可以解释这一点的可变参数模板参数。
然后我认为它与显式模板专业化有某种关系,并试图在一个孤立的程序中重现它,例如像这样
namespace isolated {
template<class T>
class Test {
public:
static void say() {
cout << "I am generic" << endl;
}
};
template<>
class Test<int> {
public:
static void say() …Run Code Online (Sandbox Code Playgroud) 我正在从 C# 移植一些严重依赖 Rx 的代码,而且我很难找到一些最常用的 C# 方法的 C++ 等价物。
特别是,我想从订阅/取消订阅逻辑创建一个 observable。在 C# 中,我使用Observable.Create<TSource> Method (Func<IObserver<TSource>, Action>)覆盖来创建一个 observable。例如
var observable = Observable.Create<int>(observer =>
{
observers.Add(observer);
return () =>
{
observers.Remove(observer)
};
});
Run Code Online (Sandbox Code Playgroud)
是否可以用RxCpp做同样的事情?我认为答案在于rx::observable<>::create(OnSubscribe os)方法,但我不知道如何使用它来“注册”取消订阅的 lambda。
我有一个设备可以传输一些事件.我想使用反应式扩展来模拟以下行为:
我知道如何等待加密狗连接(1.):
auto waitForDongle = events.take_while([](auto const & event) {
return event == DongleConnected
}).subscribe([](auto) {});
Run Code Online (Sandbox Code Playgroud)
我知道如何捕获流(2.):
auto streamMotionData = events.take_while([](auto const &) { return !keyboardPressed(); })
.map([](auto const & evt) -> boost::optional<std::vector<double>> {
...
return data;
}).subscribe([](vector<double> const &) { ...});
Run Code Online (Sandbox Code Playgroud)
我的问题是我不知道如何组合流以便回到1.以及之后2.我只知道如何一次又一次地执行此操作.但我想要上述行为.