如何批量使用BlockingCollection <T>

Jod*_*ell 12 .net c# concurrency

我想出了一些代码来消耗队列中所有的wating项目.而不是逐项处理项目,将所有等待项目作为一组处理是有意义的.

我已经宣布我的队列是这样的.

private BlockingCollection<Item> items = 
    new BlockingCollection<Item>(new ConcurrentQueue<Item>);
Run Code Online (Sandbox Code Playgroud)

然后,在消费者线程上,我打算像这样批量阅读这些项目,

Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
    var workToDo = new List<Item>();
    workToDo.Add(nextItem);

    while(this.items.TryTake(out nextItem))
    {
        workToDo.Add(nextItem);
    }

    // process workToDo, then go back to the queue.
}
Run Code Online (Sandbox Code Playgroud)

这种方法缺乏实用性,GetConsumingEnumerable我不禁想知道我是否错过了更好的方法,或者我的方法是否存在缺陷.

有没有更好的方法来BlockingCollection<T>批量消费?

vle*_*lee 5

一种解决方案是使用BufferBlock<T>来自 System.Threading.Tasks.Dataflow(包含在 .net core 3+ 中)。它不使用GetConsumingEnumerable(),但它仍然允许您使用相同的实用程序,主要是:

  • 允许与多个(对称和/或非对称)消费者和生产者并行处理
  • 线程安全(允许上述情况) - 无需担心竞争条件
  • 可以通过取消令牌和/或收集完成来取消
  • 消费者阻塞直到数据可用,避免在轮询上浪费 CPU 周期

还有一个BatchBlock<T>, 但这将您限制为固定大小的批次。

var buffer = new BufferBlock<Item>();
while (await buffer.OutputAvailableAsync())
{
    if (buffer.TryReceiveAll(out var items))
        //process items
}
Run Code Online (Sandbox Code Playgroud)

这是一个工作示例,它演示了以下内容:

  • 并行处理可变长度批次的多个对称消费者
  • 多个对称的生产者(在这个例子中不是真正并行运行)
  • 生产者完成后完成收集的能力
  • 为了使示例简短,我没有演示使用 CancellationToken
  • 能够等到生产者和/或消费者完成
  • 从不允许异步的区域调用的能力,例如构造函数
  • Thread.Sleep()通话不是必需的,但有助于模仿一些会出现在更繁重的情况的处理时间
  • theTask.WaitAll()和 theThread.Sleep()都可以选择转换为它们的异步等价物
  • 无需使用任何外部库
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program
{
    static void Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        List<Task> consumers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            consumers.Add(Task.Factory.StartNew(async () =>
            {
                // need to copy this due to lambda variable capture
                var num = i; 
                while (await buffer.OutputAvailableAsync())
                {
                    if (buffer.TryReceiveAll(out var items))
                        Console.WriteLine($"Consumer {num}:    " + 
                            items.Aggregate((a, b) => a + ", " + b));

                        // real life processing would take some time
                        await Task.Delay(500); 
                }

                Console.WriteLine($"Consumer {num} complete");
            }));

            // give consumer tasks time to activate for a better demo
            Thread.Sleep(100); 
        }

        // Kick off producer task(s)
        List<Task> producers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            producers.Add(Task.Factory.StartNew(() =>
            {
                for (int j = 0 + (1000 * i); j < 500 + (1000 * i); j++)
                    buffer.Post(j.ToString());
            }));

            // space out the producers for a better demo
            Thread.Sleep(10); 
        }

        // may also use the async equivalent
        Task.WaitAll(producers.ToArray());
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete(); 

        // may also use the async equivalent
        Task.WaitAll(consumers.ToArray()); 
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

这是代码的现代化和简化版本。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    private static async Task Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        var consumers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var id = i;
            consumers.Add(Task.Run(() => StartConsumer(id, buffer)));

            // give consumer tasks time to activate for a better demo
            await Task.Delay(100);
        }

        // Kick off producer task(s)
        var producers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var pid = i;
            producers.Add(Task.Run(() => StartProducer(pid, buffer)));

            // space out the producers for a better demo
            await Task.Delay(10);
        }

        // may also use the async equivalent
        await Task.WhenAll(producers);
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete();

        // may also use the async equivalent
        await Task.WhenAll(consumers);
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }

    private static async Task StartConsumer(
            int id,
            IReceivableSourceBlock<string> buffer)
    {
        while (await buffer.OutputAvailableAsync())
        {
            if (buffer.TryReceiveAll(out var items))
            {
                Console.WriteLine($"Consumer {id}: " + 
                    items.Aggregate((a, b) => a + ", " + b));
            }

            // real life processing would take some time
            await Task.Delay(500);
        }

        Console.WriteLine($"Consumer {id} complete");
    }

    private static Task StartProducer(int pid, ITargetBlock<string> buffer)
    {
        for (var j = 0 + (1000 * pid); j < 500 + (1000 * pid); j++)
        {
            buffer.Post(j.ToString());
        }

        return Task.CompletedTask;
    }
}
Run Code Online (Sandbox Code Playgroud)


Jon*_*nna 2

ConcurrentQueue<T>虽然在某些方面并不那么好,但我自己的方法允许使用AtomicDequeueAllLLQueue<T>方法进行批量出队,其中当前队列中的所有项目都通过单个(原子和线程安全)操作从中取出,然后处于非-供单个线程使用的线程安全集合。该方法是专门针对需要批量读取操作的场景而设计的。

虽然这不是阻塞的,但它可以用来很容易地创建阻塞集合:

public BlockingBatchedQueue<T>
{
  private readonly AutoResetEvent _are = new AutoResetEvent(false);
  private readonly LLQueue<T> _store;
  public void Add(T item)
  {
    _store.Enqueue(item);
    _are.Set();
  }
  public IEnumerable<T> Take()
  {
    _are.WaitOne();
    return _store.AtomicDequeueAll();
  }
  public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
  {
    if(_are.WaitOne(millisecTimeout))
    {
      items = _store.AtomicDequeueAll();
      return true;
    }
    items = null;
    return false;
  }
}
Run Code Online (Sandbox Code Playgroud)

这是一个不执行以下操作的起点:

  1. 处置后处理待处理的等待读者。
  2. 担心多个读者可能会因一个人正在阅读时发生的写入而引发潜在的竞争(它只是认为偶尔出现的空结果是可以枚举的)。
  3. 对写作设置任何上限。

所有这些都可以添加,但我想将一些实际用途保持在最低限度,希望在上面定义的限制内不会出现错误。