从 IEnumerable<Task<T>> 到 IAsyncEnumerable<T> 通过在 Parallel.ForEach/Parallel.ForEachAsync 内返回的yield 给出错误 CS1621

Yan*_*rin 3 c# parallel.foreach .net-core iasyncenumerable parallel.foreachasync

在 .NET 6 项目中,我必须调用一个偏移分页(页/每页)的 Web API,并且我希望尽可能使 n 个调用并行。

这是使用给定页码调用 API 一次的方法:

private Task<ApiResponse> CallApiAsync(int page,
    CancellationToken cancellationToken = default)
{
    return GetFromJsonAsync<ApiResponse>($"...&page={page}", cancellationToken)
        .ConfigureAwait(false);
}
Run Code Online (Sandbox Code Playgroud)

我实际上需要的是从第 1 页到第 n 页的所有 API 调用的仅前向流式迭代器,因此考虑到这一要求,我认为这IAsyncEnumerable是正确的 API,这样我就可以并行触发 API 调用并访问每个 API 响应一旦准备好,就可以完成,而不需要全部完成。

所以我想出了以下代码:

public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    int numProducts = GetNumberOfProducts(perPage);

    int numCalls = MathExtensions.CeilDiv(numProducts, perPage);

    var pages = Enumerable.Range(1, numCalls);

    Parallel.ForEach(pages, async page => {
        yield return await CallApiAsync(page, cancellationToken).ConfigureAwait(false);
    });

    yield break;
}
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误yield:CS1621 - 该yield 语句不能在匿名方法或lambda 表达式中使用。
有没有办法达到我想要的结果?
如果我不够清楚,请随时提问!

The*_*ias 5

为此目的,最容易使用的工具是TransformBlock<TInput,TOutput>来自 TPL Dataflow 库的工具。该组件在 .NET Core 及更高版本中原生可用,它本质上是一个具有两个队列(输入和输出)的处理器/投影仪/转换器。您指定处理函数,然后根据需要配置选项,然后向其提供数据,最后检索处理后的输出:

public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    int numProducts = GetNumberOfProducts(perPage);
    int numCalls = MathExtensions.CeilDiv(numProducts, perPage);
    var pages = Enumerable.Range(1, numCalls);

    TransformBlock<int, ApiResponse> block = new(async page =>
    {
        return await CallApiAsync(page, cancellationToken);
    }, new ExecutionDataflowBlockOptions()
    {
        CancellationToken = cancellationToken,
        MaxDegreeOfParallelism = 10, // Configurable, the default is 1
        EnsureOrdered = true, // This is the default
    });

    // Feed the block with input data
    foreach (var page in pages) block.Post(page);
    block.Complete();

    // Emit the output data as they become available
    while (await block.OutputAvailableAsync())
        while (block.TryReceive(out var item))
            yield return item;

    // Propagate possible exception (including cancellation)
    await block.Completion;
}
Run Code Online (Sandbox Code Playgroud)

这个简单的实现会在枚举TransformBlock结果时启动 ,并且直到所有处理完成或被取消IAsyncEnumerable<ApiResponse>时才会停止。cancellationToken该处理不是由结果序列的枚举驱动的。break如果客户端代码只是通过循环放弃枚举,它甚至不会停止await foreach。如果您想包含此功能(优雅终止),则必须添加一个try-finally块和一个内部链接,CancellationTokenSource如下所示。产生循环应放置在 内部try,链接的取消CancellationTokenSource应放置在 内部finally