如何在项目准备好后立即解析 IAsyncEnumerables 列表

use*_*506 1 .net c# .net-core iasyncenumerable

public async IAsyncEnumerable<Entity> FindByIds(List<string> ids)
    {
        List<List<string>> splitIdsList = ids.Split(5);

        var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();

        foreach (var entities in entityList)
        {
            await foreach (var entity in entities)
            {
                yield return entity;
            }
        }
    }

private async IAsyncEnumerable<Entity> FindByIdsQuery(List<string> ids)
    {
        var result = await Connection.QueryAsync(query, new {ids})

        foreach (var entity in result)
        {
            yield return entity;
        }
    }
Run Code Online (Sandbox Code Playgroud)

如果我向这个函数发送 25 个 ID。第一个 FindByIdsQuery 需要 5000 毫秒。其他 4 个 FindByIdsQuery 需要 100 毫秒。然后这个解决方案不会输出任何实体,直到 5000 毫秒之后。有没有任何解决方案可以在有人输出时立即开始输出实体。或者,如果您可以在 Task 中执行类似操作,则使用Task.WhenAny.

需要明确的是:5 个查询中的任何一个都可能需要 5000 毫秒。

Eup*_*ric 5

From your comments, I understood your problem. What you are basically looking for is some kind of "SelectMany" operator. This operator would start awaiting all of the IAsyncEnumerables and return items in order in which they come, irrespective in what order the source async enumerables are.

I was hoping, that default AsyncEnumerable.SelectMany does this, but I found that not to be true. It goes through the source enumerables and then goes through whole inner enumerable before continuing onto next. So I hacked together SelectMany variant that properly awaits for all inner async enumerables at the same time. Be warned, I do not guarantee correctness, nor safety. There is zero error handling.

/// <summary>
/// Starts all inner IAsyncEnumerable and returns items from all of them in order in which they come.
/// </summary>
public static async IAsyncEnumerable<TItem> SelectManyAsync<TItem>(IEnumerable<IAsyncEnumerable<TItem>> source)
{
    // get enumerators from all inner IAsyncEnumerable
    var enumerators = source.Select(x => x.GetAsyncEnumerator()).ToList();

    List<Task<(IAsyncEnumerator<TItem>, bool)>> runningTasks = new List<Task<(IAsyncEnumerator<TItem>, bool)>>();

    // start all inner IAsyncEnumerable
    foreach (var asyncEnumerator in enumerators)
    {
        runningTasks.Add(MoveNextWrapped(asyncEnumerator));
    }

    // while there are any running tasks
    while (runningTasks.Any())
    {
        // get next finished task and remove it from list
        var finishedTask = await Task.WhenAny(runningTasks);
        runningTasks.Remove(finishedTask);

        // get result from finished IAsyncEnumerable
        var result = await finishedTask;
        var asyncEnumerator = result.Item1;
        var hasItem = result.Item2;

        // if IAsyncEnumerable has item, return it and put it back as running for next item
        if (hasItem)
        {
            yield return asyncEnumerator.Current;

            runningTasks.Add(MoveNextWrapped(asyncEnumerator));
        }
    }

    // don't forget to dispose, should be in finally
    foreach (var asyncEnumerator in enumerators)
    {
        await asyncEnumerator.DisposeAsync();
    }
}

/// <summary>
/// Helper method that returns Task with tuple of IAsyncEnumerable and it's result of MoveNextAsync.
/// </summary>
private static async Task<(IAsyncEnumerator<TItem>, bool)> MoveNextWrapped<TItem>(IAsyncEnumerator<TItem> asyncEnumerator)
{
    var res = await asyncEnumerator.MoveNextAsync();
    return (asyncEnumerator, res);
}
Run Code Online (Sandbox Code Playgroud)

You can then use it to merge all the enumerables instead of the first foreach:

    var entities = SelectManyAsync(splitIdsList.Select(x => FindByIdsQuery(x)));

    return entities;
Run Code Online (Sandbox Code Playgroud)