exp*_*ite 3 scala volatility system.reactive rx-java rx-scala
如果问题措辞不当,我会道歉,我会尽我所能.
如果我有一系列值的时间,Observable[(U,T)]其中U是一个值,而T是一个类似时间的类型(或者我认为有任何差异),我怎么能写一个自动重置一触式屏障的运算符,这是沉默的abs(u_n - u_reset) < barrier,但是t_n - t_reset如果触摸屏障会吐出,此时它也会重置u_reset = u_n.
也就是说,此运算符接收的第一个值成为基线,并且它不会发出任何结果.此后,它监视流的值,并且只要其中一个超出基线值(高于或低于),它就会发出经过的时间(通过事件的时间戳测量),并重置基线.然后将处理这些时间以形成波动率的高频估计.
作为参考,我试图写一个在http://www.amazon.com/Volatility-Trading-CD-ROM-Wiley/dp/0470181990中概述的波动率估算器,而不是测量标准偏差(在常规均匀时间的偏差) ),你反复测量一些固定障碍量突破障碍所需的时间.
具体来说,这可以使用现有的运营商编写吗?我有点不知道状态将如何被重置,尽管我可能需要制作两个嵌套的运算符,一个是一次性的,另一个是不断创建一次性的...我知道它可以通过写入来完成一个手工,但后来我需要写自己的出版商等.
谢谢!
我不完全理解示例中的算法和变量,但您可以使用flatMap某些堆状态并返回empty()或just()根据需要:
int[] var1 = { 0 };
source.flatMap(v -> {
var1[0] += v;
if ((var1[0] & 1) == 0) {
return Observable.just(v);
}
return Observable.empty();
});
Run Code Online (Sandbox Code Playgroud)
如果由于多个消费者而需要按序列状态,那么defer整个过程就可以:
Observable.defer(() -> {
int[] var1 = { 0 };
return source.flatMap(v -> {
var1[0] += v;
if ((var1[0] & 1) == 0) {
return Observable.just(v);
}
return Observable.empty();
});
}).subscribe(...);
Run Code Online (Sandbox Code Playgroud)