Tam*_*red 5 .net c# linq system.reactive observable
在我写一个股市交易员的过程中,IObserver我遇到了三个错误,这些错误主要来自Reactive Extensions图书馆内部.
我有以下CompanyInfo课程:
public class CompanyInfo
{
public string Name { get; set; }
public double Value { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
还有一个IObservable<CompanyInfo>叫StockMarket:
public class StockMarket : IObservable<CompanyInfo>
Run Code Online (Sandbox Code Playgroud)
我Observer看起来如下:
public class StockTrader : IObserver<CompanyInfo>
{
public void OnCompleted()
{
Console.WriteLine("Market Closed");
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(CompanyInfo value)
{
WriteStock(value);
}
private void WriteStock(CompanyInfo value) { ... }
}
Run Code Online (Sandbox Code Playgroud)
我运行以下代码:
StockMarket market = GetStockMarket();
StockTrader trader = new StockTrader();
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
}) //[F, +100%]; [S, -20%]
);
using (IDisposable subscription = differential.Subscribe(trader))
{
Observable.Wait(market);
}
Run Code Online (Sandbox Code Playgroud)
发生以下三个错误之一:
下面ArgumentException是抛出从内Reactive Extensions:
System.ArgumentException:已添加具有相同键的项.在System.Tolrow.ThrowArgumentException(ExceptionResource资源)处System.Collections.Generic.Dictionary`2.Insert(TKey键,TValue值,布尔加法)在System.Reactive.Linq.Observable.GroupBy'3 ._.OnNext(TSource)处于System.ThrowHelper.ThrowArgumentException(ExceptionResource资源)值)
以下内容IndexOutOfRangeException:
参数名称:System.ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument参数,ExceptionResource资源)的索引,位于StockMarketTests的System.Collections.Generic.List'1.get_Item(Int32索引).<> c__DisplayClass0_0.b__2(IList'1 y). Reactive.Linq.Observable.Select'2 ._.OnNext(TSource值)
Console零星的调整文本(颜色应该是一致的):
什么可能导致这些古怪的症状?
关于概念的最大好处之一Reactive Extensions是能够订阅"发生"(IObservable)发生在"某处"并在这种"发生"上应用面向对象的概念- 这无需知道'某处'的位置.
这种方式Reactive Extensions简化了event面向对象程序设计和producer-consumer问题很多.
在IObservable不知道观察数据源的情况下订阅的能力迫使订户假定通知是不可预测的.换句话说,在观察时你应该假设通知可以同时传递.IObservable
由于行为合同Reactive Externsions,IObservables应该一次生产一个项目.通常,这就是发生的事情,但有时外部实施不遵循该合同.
让我们来看看这三个问题中的每一个:
GroupBy 不是线程安全的GroupBy通过返回a来工作IObservable<IGroupedObservable<T>>,它的OnNext方法调用外部IObservable的s OnNext,IGroupedObservable<T>其匹配当前通知.它通过为每个键内部保留一个IGroupedObservable<T> (更准确地说是一个Subject<T>)来实现Dictionary这一点 - 这不足为奇 - 不是一个ConcurrentDictionary.这意味着两个邻近的通知可以导致双重插入.
Select 并不孤单Select线程安全性由其提供的代表决定.另外,在上述的情况下,提供给委托Select方法依赖于以下事实:Buffer(2, 1)将提供的列表具有2大小Buffer包含一个Queue,这是不并发,因此从多个线程迭代时- Buffer的Queue能为我们提供了一些意想不到的结果.
另一个Exception可能从同样的理由被抛出时NullReferenceException,如果y将被提供null,或InvalidOperationException为Queue,而它的被迭代可以修改.
最后但并非最不重要的,即使你只做基本的观察,StockTrader的OnNext方法是修改非控制台原子操作引起的怪异的文本布局.
该Synchronize方法的存在使您能够验证您是否订阅了线性 IObservable<T>,这意味着可以同时对OnNext方法进行多次调用.
因为即使GroupBy扩展方法不是线程安全的,也Synchronize需要在链的开头调用该方法:
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.Synchronize()
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
})
); //[F, +100%]; [S, -20%]
Run Code Online (Sandbox Code Playgroud)
请注意,为查询Synchronize添加了另一个代理Observable,因此它会使查询稍慢,因此在不需要时应避免使用它.
| 归档时间: |
|
| 查看次数: |
511 次 |
| 最近记录: |