Mat*_*lay 5 c# system.reactive
我目前正在研究一种通过方程组合多个数据流的应用程序.我希望能做的是:
var result = (stream1 + stream2) / stream3 + stream4 * 2;
Run Code Online (Sandbox Code Playgroud)
当result更新,只要任何流的更新.目前,我在Rx中表达这一点的唯一方法是:
var result = stream1.CombineLatest(stream2, (x,y) => x+y)
.CombineLatest(stream3, (x,y) => x / y)
.CombineLatest(stream4, (x,y) => x + y*2);
Run Code Online (Sandbox Code Playgroud)
这几乎不是那么清楚.
我目前的想法如下:
Public class ArithmeticStream : IObservable<double>
{
public static ArithmeticStream operator +(ArithmeticStream xx, ArithmeticStream yy)
{
return Observable.CombineLatest(xx,yy, (x,y) => x + y);
}
...
}
Run Code Online (Sandbox Code Playgroud)
问题是CombineLatest返回一个IObservable<double>而不是ArithmeticStream.
两个可能的问题:
如何透明地将其IObservable<double>转换为ArithmeticStream?
有没有一条替代路线可以让我得到我想要的结果?
将其添加为新答案,因为它完全不同......
因此,如果您致力于执行操作员重载路线,那么您需要这样做(嗯,至少是一种方式)...说实话,我不喜欢这种方法 -虽然它确实使查询编写更加简洁,但 DSL 方法具有类似的“简洁性”,但在不依赖运算符重载的意义上更清晰。
public static class ArithmeticStreamExt
{
public static ArithmeticStream Wrap(this IObservable<double> src)
{
return new ArithmeticStream(src);
}
public static ArithmeticStream Const(this double constValue)
{
return new ArithmeticStream(Observable.Return(constValue));
}
}
public class ArithmeticStream
{
private IObservable<double> _inner;
public ArithmeticStream(IObservable<double> source)
{
_inner = source;
}
public IObservable<double> Source {get { return _inner; }}
public static ArithmeticStream operator +(
ArithmeticStream left,
ArithmeticStream right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right._inner, (l, r) => l + r));
}
public static ArithmeticStream operator -(
ArithmeticStream left,
ArithmeticStream right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right._inner, (l, r) => l - r));
}
public static ArithmeticStream operator *(
ArithmeticStream left,
ArithmeticStream right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right._inner, (l, r) => l * r));
}
public static ArithmeticStream operator /(
ArithmeticStream left,
ArithmeticStream right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right._inner, (l, r) => l / r));
}
public static ArithmeticStream operator +(
ArithmeticStream left,
IObservable<double> right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right, (l, r) => l + r));
}
public static ArithmeticStream operator -(
ArithmeticStream left,
IObservable<double> right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right, (l, r) => l - r));
}
public static ArithmeticStream operator *(
ArithmeticStream left,
IObservable<double> right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right, (l, r) => l * r));
}
public static ArithmeticStream operator /(
ArithmeticStream left,
IObservable<double> right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right, (l, r) => l / r));
}
}
Run Code Online (Sandbox Code Playgroud)
还有一个测试装置:
void Main()
{
var s1 = new Subject<double>();
var s2 = new Subject<double>();
var s3 = new Subject<double>();
var s4 = new Subject<double>();
var result = (s1.Wrap() + s2) / s3 + (s4.Wrap() * 2.0.Const());
using(result.Source.Subscribe(Console.WriteLine))
{
s1.OnNext(1.0);
s2.OnNext(2.0);
s3.OnNext(3.0);
s4.OnNext(4.0);
}
}
Run Code Online (Sandbox Code Playgroud)