如果 X 分钟内没有新项目进入通道,如何读取 Channel<T> 中小于批量大小的剩余项目?

use*_*018 5 c# batching asp.net-core system.threading.channels

我正在使用ChannelfromSystem.Threading.Channels并想要批量读取项目(5 个项目),我有如下方法,

public class Batcher
{
    private readonly Channel<MeasurementViewModel> _channel;
    public Batcher()
    {
        _channel = Channel.CreateUnbounded<MeasurementViewModel>();
    }
    public async Task<MeasurementViewModel[]> ReadBatchAsync(int batchSize, CancellationToken stoppingToken)
    {
        var result = new MeasurementViewModel[batchSize];

        for (var i = 0; i < batchSize; i++)
        {
            result[i] = await _channel.Reader.ReadAsync(stoppingToken);
        }

        return result;
    }
}
Run Code Online (Sandbox Code Playgroud)

在 ASP.NET Core 后台服务中,我使用它,如下所示,

public class WriterService : BackgroundService
{
    private readonly Batcher _batcher;
    public WriterService(Batcher batcher)
    {
        _batcher = batcher;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var batchOfItems = await _batcher.ReadBatchAsync(5, stoppingToken);

            var range = string.Join(',', batchOfItems.Select(item => item.Value));

            var x = range;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是有效的,每当有 5 个项目时Channel,我就会得到range.

问题是,当 中只剩下 2 个项目Channel并且自过去 10 分钟以来没有项目到达 时Channel,那么如何读取 中剩余的 2 个项目Channel

The*_*ias 9

您可以创建一个linked CancellationTokenSource,以便您可以同时监视外部取消请求和内部引发的超时。ReadBatchAsync下面是通过为类创建扩展方法来使用此技术的示例ChannelReader

public static async ValueTask<T[]> ReadBatchAsync<T>(
    this ChannelReader<T> channelReader,
    int batchSize, TimeSpan timeout, CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(channelReader);
    if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
    if (timeout < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(timeout));
    using CancellationTokenSource linkedCTS = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);
    linkedCTS.CancelAfter(timeout);
    List<T> buffer = new();
    while (true)
    {
        var token = buffer.Count == 0 ? cancellationToken : linkedCTS.Token;
        T item;
        try
        {
            item = await channelReader.ReadAsync(token).ConfigureAwait(false);
        }
        catch (OperationCanceledException)
        {
            cancellationToken.ThrowIfCancellationRequested();
            break; // The cancellation was induced by timeout (ignore it)
        }
        catch (ChannelClosedException)
        {
            if (buffer.Count == 0) throw;
            break;
        }
        buffer.Add(item);
        if (buffer.Count >= batchSize) break;
    }
    return buffer.ToArray();
}
Run Code Online (Sandbox Code Playgroud)

timeout此方法将在指定时间过去后立即生成批次,或者如果batchSize达到指定时间则更快生成批次,前提是该批次至少包含一个项目。否则,一旦收到第一个项目,它就会生成一个单项目批次。

如果通道已通过调用该channel.Writer.Complete()方法完成,并且它不再包含任何项目,则该方法将传播与本机方法抛出的ReadBatchAsync相同内容。ChannelClosedExceptionReadAsync

如果外部CancellationToken被取消,则通过抛出 来传播取消OperationCanceledExceptionChannelReader<T>此时可能已从内部提取的任何项目都会丢失。这使得取消功能成为破坏性操作。建议Channel<T>在此之后将其整体丢弃。

使用示例:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (true)
    {
        MeasurementViewModel[] batch;
        try
        {
            batch = await _channel.Reader.ReadBatchAsync(
                5, TimeSpan.FromMinutes(10), stoppingToken);
        }
        catch (OperationCanceledException) { return; }
        catch (ChannelClosedException) { break; }

        Console.WriteLine(String.Join(',', batch.Select(item => item.Value)));
    }
    await _channel.Reader.Completion; // Propagate possible failure
}
Run Code Online (Sandbox Code Playgroud)

对于批量消费通道的另一种方法,其取消是非破坏性的,你可以看看这个问题:

  • 先生,你让我很高兴。太好了,欣赏,谢谢!!!! (2认同)