Joh*_*ohn 27 c# async-await c#-5.0
我的解决方案中有两个项目:WPF项目和类库.
在我的类库中:
我有一个符号列表:
class Symbol
{
Identifier Identifier {get;set;}
List<Quote> HistoricalQuotes {get;set;}
List<Financial> HistoricalFinancials {get;set;}
}
Run Code Online (Sandbox Code Playgroud)
对于每个符号,我查询金融服务以使用webrequest检索每个符号的历史财务数据.(webClient.DownloadStringTaskAsync(URI))
所以这是我的方法:
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();
foreach (var symbol in await _listSymbols)
{
historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
}
while (historicalFinancialTask.Count > 0)
{
var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
historicalFinancialTask.Remove(historicalFinancial);
// the line below doesn't compile, which is understandable because method's return type is a Task of something
yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data);
}
}
private async Task<HistoricalFinancialResult> GetFinancialsQueryAsync(Symbol symbol)
{
var result = new HistoricalFinancialResult();
result.Symbol = symbol;
result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously
return result;
}
private class HistoricalFinancialResult
{
public Symbol Symbol { get; set; }
public IEnumerable<Financial> Data { get; set; }
// equality members
}
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,我希望每次下载每个符号的财务历史数据时,都会产生结果,而不是等待我对金融服务的所有调用完成.
在我的WPF中,这就是我想要做的:
foreach(var symbol in await _service.GetSymbolsAsync())
{
SymbolsObservableCollection.Add(symbol);
}
Run Code Online (Sandbox Code Playgroud)
看来我们不能在异步方法中产生回报,那么我可以使用什么解决方案?除了将我的GetSymbols方法移动到我的WPF项目中.
Ian*_*ths 41
虽然我喜欢TPL Dataflow组件(svick建议您使用),但转移到该系统确实需要大量的承诺 - 这不是您可以添加到现有设计中的东西.如果您正在执行大量CPU密集型数据处理并希望利用许多CPU核心,它可以提供相当大的好处.但是,充分利用它并非易事.
他使用Rx的其他建议可能更容易与现有解决方案集成.(请参阅原始文档,但是对于最新的代码,请使用Rx-Main nuget包.或者,如果您想查看源代码,请参阅Rx CodePlex站点)甚至可以继续调用代码IEnumerable<Symbol>如果你想使用一个- 你可以使用Rx纯粹作为一个实现细节,[ 编辑2013/11/09添加: ]虽然正如svick指出的那样,鉴于你的最终目标,这可能不是一个好主意.
在我向您展示一个例子之前,我想清楚一下我们究竟在做什么.您的示例有一个带有此签名的方法:
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
Run Code Online (Sandbox Code Playgroud)
返回类型,Task<IEnumerable<Symbol>>实质上是说"这是一种产生单一结果类型的方法IEnumerable<Symbol>,它可能不会立即产生该结果."
它是一个结果位,我认为是造成你的悲伤,因为这不是你真正想要的东西.A Task<T>(无论T可能是什么)表示单个异步操作.它可能有很多步骤(await如果你把它作为一个C#async方法实现它的许多用途)但最终它会产生一件事.你想在不同的时间生产多种东西,所以Task<T>不太适合.
如果你真的要做你的方法签名所承诺的 - 最终产生一个结果 - 你可以做的一种方法是让你的异步方法构建一个列表,然后在它好的时候产生结果:
// Note: this first example is *not* what you want.
// However, it is what your method's signature promises to do.
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();
foreach (var symbol in await _listSymbols)
{
historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
}
var results = new List<Symbol>();
while (historicalFinancialTask.Count > 0)
{
var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
historicalFinancialTask.Remove(historicalFinancial);
results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
}
return results;
}
Run Code Online (Sandbox Code Playgroud)
此方法执行其签名所示:它异步生成一系列符号.
但是大概你想创造一个IEnumerable<Symbol>能够产生物品的物品,而不是等到物品全部可用.(否则,您可能只是使用WhenAll.)您可以这样做,但yield return不是这样.
简而言之,我认为你想要做的是产生一个异步列表.有一种类型:IObservable<T>表达我认为你希望用你表达的东西Task<IEnumerable<Symbol>>:它是一系列项目(就像IEnumerable<T>)但异步.
通过类比可能有助于理解它:
public Symbol GetSymbol() ...
Run Code Online (Sandbox Code Playgroud)
是的
public Task<Symbol> GetSymbolAsync() ...
Run Code Online (Sandbox Code Playgroud)
如
public IEnumerable<Symbol> GetSymbols() ...
Run Code Online (Sandbox Code Playgroud)
是:
public IObservable<Symbol> GetSymbolsObservable() ...
Run Code Online (Sandbox Code Playgroud)
(不幸的是,不像Task<T>没有什么叫异步面向序列方法的通用命名约定.我已经添加了"可观察"在结束了,但是这不是普遍的做法.我肯定不会把它GetSymbolsAsync因为人们会期望返回一个Task.)
换句话说,Task<IEnumerable<T>>"当我很好并准备好时,我会制作这个系列",然后IObservable<T>说:"这是一个系列.当我很好并准备好时,我会生产每件产品."
因此,您需要一个返回一系列Symbol对象的方法,这些对象是异步生成的.这告诉我们你应该真的回来了IObservable<Symbol>.这是一个实现:
// Unlike this first example, this *is* what you want.
public IObservable<Symbol> GetSymbolsRx()
{
return Observable.Create<Symbol>(async obs =>
{
var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();
foreach (var symbol in await _listSymbols)
{
historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
}
while (historicalFinancialTask.Count > 0)
{
var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
historicalFinancialTask.Remove(historicalFinancial);
obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
}
});
}
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,这可以让您写出您希望编写的内容 - 此代码的主体几乎与您的相同.唯一的区别是你使用的地方yield return(没有编译),这会调用OnNextRx提供的对象上的方法.
写完之后,你可以轻松地将其包装成一个IEnumerable<Symbol>([ 编辑2013/11/29添加: ]虽然你可能实际上并不想这样做 - 请参阅答案末尾的补充):
public IEnumerable<Symbol> GetSymbols()
{
return GetSymbolsRx().ToEnumerable();
}
Run Code Online (Sandbox Code Playgroud)
这可能看起来不是异步的,但它实际上允许底层代码异步操作.当您调用此方法时,它不会阻止 - 即使执行获取财务信息工作的基础代码不能立即生成结果,此方法仍将立即返回IEnumerable<Symbol>.当然,如果数据尚不可用,任何试图遍历该集合的代码都将最终阻塞.但重要的是,我认为你最初想要实现的目标是:
async完成工作的方法(我的例子中的委托,作为参数传递,Observable.Create<T>但async如果你愿意,你可以写一个独立的方法)IEnumerable<Symbol>将在每个单独的项目可用时立即生成这是有效的,因为Rx的ToEnumerable方法中有一些聪明的代码,它弥合了同步世界视图IEnumerable<T>和异步生成结果之间的差距.(换句话说,这确实让你失望的是发现C#无法为你做的.)
如果你很好奇,你可以查看来源.代码ToEnumerable可以在https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs找到.
[ 编辑2013/11/29添加: ]
svick在评论中指出我错过了一些内容:你的最终目标是将内容放入ObservableCollection<Symbol>.不知怎的,我没有看到那一点.这意味着IEnumerable<T>错误的方法 - 您希望在项目可用时填充集合,而不是通过foreach循环完成.所以你只需这样做:
GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
Run Code Online (Sandbox Code Playgroud)
或类似的规定.这将在集合可用时向集合中添加项目.
这取决于顺便在UI线程上启动的整个事情.只要是这样,您的异步代码最终应该在UI线程上运行,这意味着当项目添加到集合时,也会在UI线程上发生.但是,如果由于某种原因你最终从工作线程启动东西(或者如果你ConfigureAwait在任何等待中使用,从而打破与UI线程的连接),你需要安排处理来自Rx流的项目在正确的线程上:
GetSymbolsRx()
.ObserveOnDispatcher()
.Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
Run Code Online (Sandbox Code Playgroud)
如果您在执行此操作时处于UI线程中,它将选择当前的调度程序,并确保所有通知都通过它进行.如果您在订阅时已经使用了错误的线程,则可以使用ObserveOn带有调度程序的重载.(这些要求你有一个引用System.Reactive.Windows.Threading.这些是扩展方法,所以你需要一个using包含它们的命名空间,也称为System.Reactive.Windows.Threading)
您要求的内容没有多大意义,因为它IEnumerable<T>是一个同步接口.换句话说,如果一个项目尚未可用,该MoveNext()方法必须阻止,它没有其他选择.
你需要的是某种异步版本IEnumerable<T>.为此,您可以使用IObservable<T>来自TPL数据流的 Rx或(我最喜欢的)块.有了它,你的代码可能看起来像这样(我也改变了一些变量到更好的名称):
public IReceivableSourceBlock<Symbol> GetSymbolsAsync()
{
var block = new BufferBlock<Symbol>();
GetSymbolsAsyncCore(block).ContinueWith(
task => ((IDataflowBlock)block).Fault(task.Exception),
TaskContinuationOptions.NotOnRanToCompletion);
return block;
}
private async Task GetSymbolsAsyncCore(ITargetBlock<Symbol> block)
{
// snip
while (historicalFinancialTasks.Count > 0)
{
var historicalFinancialTask =
await Task.WhenAny(historicalFinancialTasks);
historicalFinancialTasks.Remove(historicalFinancialTask);
var historicalFinancial = historicalFinancialTask.Result;
var symbol = new Symbol(
historicalFinancial.Symbol.Identifier,
historicalFinancial.Symbol.HistoricalQuotes,
historicalFinancial.Data);
await block.SendAsync(symbol);
}
}
Run Code Online (Sandbox Code Playgroud)
用法可能是:
var symbols = _service.GetSymbolsAsync();
while (await symbols.OutputAvailableAsync())
{
Symbol symbol;
while (symbols.TryReceive(out symbol))
SymbolsObservableCollection.Add(symbol);
}
Run Code Online (Sandbox Code Playgroud)
要么:
var symbols = _service.GetSymbolsAsync();
var addToCollectionBlock = new ActionBlock<Symbol>(
symbol => SymbolsObservableCollection.Add(symbol));
symbols.LinkTo(
addToCollectionBlock, new DataflowLinkOptions { PropagateCompletion = true });
await symbols.Completion;
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7771 次 |
| 最近记录: |