基于java rx中的最后两个当前值进行过滤

Pie*_*vis 5 java filter reactive-programming

我正在尝试使用java反应式扩展构建一个简单的应用程序.我有两个连续发出温度值的流,我想检测并滤除可能出现错误的感应温度峰值,这样做我需要考虑先例值,以便我可以考虑到这样的变化: 过滤对的想法 我仍然无法在文档中找到合适的操作符.有谁知道我该如何完成任务?我应该自定义运营商吗?

这些是我的流:

double min = 50, max = 75, spikeFreq = 0.01;
    Observable<Double> tempStream1 = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new TempStream(subscriber, min, max, spikeFreq).start();
    });

    Observable<Double> tempStream2 = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new TempStream(subscriber, min, max, spikeFreq).start();
    });

public class TempStream extends Thread{

private Subscriber<? super Double> subscriber;
private TempSensor sensor;

public TempStream(Subscriber<? super Double> subscriber, double min,
        double max, double spikeFreq) {
    this.subscriber = subscriber;
    sensor = new TempSensor(min, max, spikeFreq);
}

    @Override
    public void run() {
        Random gen = new Random(System.currentTimeMillis());
        while (!subscriber.isUnsubscribed()) {
            try {
                subscriber.onNext(sensor.getCurrentValue());
                Thread.sleep(1000 + gen.nextInt() % 1000);
            } catch (Exception ex) {
                subscriber.onError(ex);
            }
        }
        subscriber.onCompleted();
    }
}
Run Code Online (Sandbox Code Playgroud)

感谢您的任何帮助.

Geo*_*iki 11

在这种情况下,buffer运营商(http://reactivex.io/documentation/operators/buffer.html)可能会有所帮助.你想用buffercount = 2skip = 1.这样你就可以获得流中一个元素的"前瞻".

例如:

stream.buffer(2,1).filter(buf -> buf.size() == 2 && buf.get(0) - buf.get(1) < max);
Run Code Online (Sandbox Code Playgroud)

请注意,此示例还检查是否缓冲了两个值,因为在完成流时可能只发生一个值.

  • 这不会重复峰值检查中的值。例如,`10,20,35,40` 流将发出 `[10,20]` 和 `[35,40]` 而不是 `[10,20]`、`[20,35]` 和`[35,40]` (3认同)
  • 如何与从第一个开始的前一个配对(所以前一个是可选的,第一次发出它是空的)?可能使用扫描? (2认同)