小编Ale*_*ich的帖子

从多个 IAsyncEnumerable 流并行接收数据

我遇到一种情况,我需要从多个 IAsyncEnumerable 源接收数据。为了提高性能,应该以并行方式执行。

我已经使用AsyncAwaitBestPracticesSystem.Threading.Tasks.DataflowSystem.Linq.Async nuget 包编写了这样的代码来实现此目标:

public static async IAsyncEnumerable<T> ExecuteSimultaneouslyAsync<T>(
        this IEnumerable<IAsyncEnumerable<T>> sources,
        int outputQueueCapacity = 1,
        TaskScheduler scheduler = null)
    {
        var sourcesCount = sources.Count();

        var channel = outputQueueCapacity > 0 
            ? Channel.CreateBounded<T>(sourcesCount)
            : Channel.CreateUnbounded<T>();

        sources.AsyncParallelForEach(
                async body => 
                {
                    await foreach (var item in body)
                    {
                        await channel.Writer.WaitToWriteAsync();
                        await channel.Writer.WriteAsync(item);
                    }
                },
                maxDegreeOfParallelism: sourcesCount,
                scheduler: scheduler)
            .ContinueWith(_ => channel.Writer.Complete())
            .SafeFireAndForget();

        while (await channel.Reader.WaitToReadAsync())
            yield return await channel.Reader.ReadAsync();
    }

public static async Task AsyncParallelForEach<T>( …
Run Code Online (Sandbox Code Playgroud)

c# producer-consumer task-parallel-library iasyncenumerable system.threading.channels

4
推荐指数
1
解决办法
1073
查看次数