Yan*_*rin 3 c# parallel.foreach .net-core iasyncenumerable parallel.foreachasync
在 .NET 6 项目中,我必须调用一个偏移分页(页/每页)的 Web API,并且我希望尽可能使 n 个调用并行。
这是使用给定页码调用 API 一次的方法:
private Task<ApiResponse> CallApiAsync(int page,
CancellationToken cancellationToken = default)
{
return GetFromJsonAsync<ApiResponse>($"...&page={page}", cancellationToken)
.ConfigureAwait(false);
}
Run Code Online (Sandbox Code Playgroud)
我实际上需要的是从第 1 页到第 n 页的所有 API 调用的仅前向流式迭代器,因此考虑到这一要求,我认为这IAsyncEnumerable
是正确的 API,这样我就可以并行触发 API 调用并访问每个 API 响应一旦准备好,就可以完成,而不需要全部完成。
所以我想出了以下代码:
public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
int numProducts = GetNumberOfProducts(perPage);
int numCalls = MathExtensions.CeilDiv(numProducts, perPage);
var pages = Enumerable.Range(1, numCalls);
Parallel.ForEach(pages, async page => {
yield return await CallApiAsync(page, cancellationToken).ConfigureAwait(false);
});
yield break;
}
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误yield
:CS1621 - 该yield 语句不能在匿名方法或lambda 表达式中使用。
有没有办法达到我想要的结果?
如果我不够清楚,请随时提问!
为此目的,最容易使用的工具是TransformBlock<TInput,TOutput>
来自 TPL Dataflow 库的工具。该组件在 .NET Core 及更高版本中原生可用,它本质上是一个具有两个队列(输入和输出)的处理器/投影仪/转换器。您指定处理函数,然后根据需要配置选项,然后向其提供数据,最后检索处理后的输出:
public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
int numProducts = GetNumberOfProducts(perPage);
int numCalls = MathExtensions.CeilDiv(numProducts, perPage);
var pages = Enumerable.Range(1, numCalls);
TransformBlock<int, ApiResponse> block = new(async page =>
{
return await CallApiAsync(page, cancellationToken);
}, new ExecutionDataflowBlockOptions()
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = 10, // Configurable, the default is 1
EnsureOrdered = true, // This is the default
});
// Feed the block with input data
foreach (var page in pages) block.Post(page);
block.Complete();
// Emit the output data as they become available
while (await block.OutputAvailableAsync())
while (block.TryReceive(out var item))
yield return item;
// Propagate possible exception (including cancellation)
await block.Completion;
}
Run Code Online (Sandbox Code Playgroud)
这个简单的实现会在枚举TransformBlock
结果时启动 ,并且直到所有处理完成或被取消IAsyncEnumerable<ApiResponse>
时才会停止。cancellationToken
该处理不是由结果序列的枚举驱动的。break
如果客户端代码只是通过循环放弃枚举,它甚至不会停止await foreach
。如果您想包含此功能(优雅终止),则必须添加一个try
-finally
块和一个内部链接,CancellationTokenSource
如下所示。产生循环应放置在 内部try
,链接的取消CancellationTokenSource
应放置在 内部finally
。