我遇到一种情况,我需要从多个 IAsyncEnumerable 源接收数据。为了提高性能,应该以并行方式执行。
我已经使用AsyncAwaitBestPractices、System.Threading.Tasks.Dataflow和System.Linq.Async nuget 包编写了这样的代码来实现此目标:
public static async IAsyncEnumerable<T> ExecuteSimultaneouslyAsync<T>(
this IEnumerable<IAsyncEnumerable<T>> sources,
int outputQueueCapacity = 1,
TaskScheduler scheduler = null)
{
var sourcesCount = sources.Count();
var channel = outputQueueCapacity > 0
? Channel.CreateBounded<T>(sourcesCount)
: Channel.CreateUnbounded<T>();
sources.AsyncParallelForEach(
async body =>
{
await foreach (var item in body)
{
await channel.Writer.WaitToWriteAsync();
await channel.Writer.WriteAsync(item);
}
},
maxDegreeOfParallelism: sourcesCount,
scheduler: scheduler)
.ContinueWith(_ => channel.Writer.Complete())
.SafeFireAndForget();
while (await channel.Reader.WaitToReadAsync())
yield return await channel.Reader.ReadAsync();
}
public static async Task AsyncParallelForEach<T>( …
Run Code Online (Sandbox Code Playgroud) c# producer-consumer task-parallel-library iasyncenumerable system.threading.channels
c# ×1