具有无效扩展的流算术

Mat*_*lay 5 c# system.reactive

我目前正在研究一种通过方程组合多个数据流的应用程序.我希望能做的是:

var result = (stream1 + stream2) / stream3 + stream4 * 2;
Run Code Online (Sandbox Code Playgroud)

result更新,只要任何流的更新.目前,我在Rx中表达这一点的唯一方法是:

var result = stream1.CombineLatest(stream2, (x,y) => x+y)
  .CombineLatest(stream3, (x,y) => x / y)
  .CombineLatest(stream4, (x,y) => x + y*2);
Run Code Online (Sandbox Code Playgroud)

这几乎不是那么清楚.

我目前的想法如下:

Public class ArithmeticStream : IObservable<double>
{
    public static ArithmeticStream operator +(ArithmeticStream xx, ArithmeticStream yy)
    {
        return Observable.CombineLatest(xx,yy, (x,y) => x + y);
    }
    ...
}
Run Code Online (Sandbox Code Playgroud)

问题是CombineLatest返回一个IObservable<double>而不是ArithmeticStream.

两个可能的问题:

如何透明地将其IObservable<double>转换为ArithmeticStream?

有没有一条替代路线可以让我得到我想要的结果?

Jer*_*all 2

将其添加为新答案,因为它完全不同......

因此,如果您致力于执行操作员重载路线,那么您需要这样做(嗯,至少是一种方式)...说实话,我不喜欢这种方法 -虽然它确实使查询编写更加简洁,但 DSL 方法具有类似的“简洁性”,但在不依赖运算符重载的意义上更清晰。

public static class ArithmeticStreamExt
{
    public static ArithmeticStream Wrap(this IObservable<double> src)
    {
        return new ArithmeticStream(src);
    }
    public static ArithmeticStream Const(this double constValue)
    {
        return new ArithmeticStream(Observable.Return(constValue));
    }
}
public class ArithmeticStream 
{
    private IObservable<double> _inner;

    public ArithmeticStream(IObservable<double> source)
    {
        _inner = source;
    }

    public IObservable<double> Source {get { return _inner; }}

    public static ArithmeticStream operator +(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l + r));
    }
    public static ArithmeticStream operator -(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l - r));
    }
    public static ArithmeticStream operator *(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l * r));
    }
    public static ArithmeticStream operator /(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l / r));
    }

    public static ArithmeticStream operator +(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l + r));
    }
    public static ArithmeticStream operator -(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l - r));
    }
    public static ArithmeticStream operator *(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l * r));
    }
    public static ArithmeticStream operator /(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
             left._inner.CombineLatest(right, (l, r) => l / r));
    }
}
Run Code Online (Sandbox Code Playgroud)

还有一个测试装置:

void Main()
{
    var s1 = new Subject<double>();
    var s2 = new Subject<double>();
    var s3 = new Subject<double>();
    var s4 = new Subject<double>();

    var result = (s1.Wrap() + s2) / s3 + (s4.Wrap() * 2.0.Const());
    using(result.Source.Subscribe(Console.WriteLine))
    {
        s1.OnNext(1.0);
        s2.OnNext(2.0);
        s3.OnNext(3.0);
        s4.OnNext(4.0);
    }
}
Run Code Online (Sandbox Code Playgroud)