消耗 System.Threading.Channels.Channel 中的所有消息

Gui*_*rgy 4 c# channel producer-consumer system.threading.channels

假设我有很多生产者,1个消费者未绑定 Channel,有一个消费者:

await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
}
Run Code Online (Sandbox Code Playgroud)

问题在于该consume函数会进行一些 IO 访问,并且可能还会进行一些网络访问,因此在消耗 1 条消息之前可能会产生更多消息。但由于IO资源不能并发访问,所以我不能有很多消费者,也不能把函数扔到consume一个Task中然后忘记它。

consume功能可以轻松修改以获取多条消息并批量处理它们。所以我的问题是,是否有一种方法可以让消费者在尝试访问通道队列时获取通道队列中的所有消息,如下所示:

while (true) {
    Message[] messages = await channel.Reader.TakeAll();
    await consumeAll(messages);
}
Run Code Online (Sandbox Code Playgroud)

编辑:我能想到的 1 个选项是:

List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
    Message msg;
    while (channel.Reader.TryRead(out msg))
        messages.Add(msg);
    if (messages.Count > 0)
    {
        await consumeAll(messages);
        messages.Clear();
    }
}
Run Code Online (Sandbox Code Playgroud)

但我觉得这应该是一个更好的方法来做到这一点。

spe*_*der 10

在阅读了Stephen Toub 的关于频道的入门读物之后,我尝试编写一个扩展方法来满足您的需要(我已经有一段时间没有使用 C# 了,所以这很有趣)。

public static class ChannelReaderEx
{
    public static async IAsyncEnumerable<IEnumerable<T>> ReadBatchesAsync<T>(
        this ChannelReader<T> reader, 
        [EnumeratorCancellation] CancellationToken cancellationToken = default
    )
    {
        while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
        {
            yield return reader.Flush().ToList();
        }
    }

    public static IEnumerable<T> Flush<T>(this ChannelReader<T> reader)
    {
        while (reader.TryRead(out T item))
        {
            yield return item;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

可以这样使用:

await foreach (var batch in channel.Reader.ReadBatchesAsync())
{
    await ConsumeBatch(batch);
}
Run Code Online (Sandbox Code Playgroud)