如何正确使用 BlockingCollection.GetConsumingEnumerable?

use*_*993 0 c# producer-consumer task-parallel-library blockingcollection

我正在尝试使用生产者/消费者模式来实现,BlockingCollection<T>所以我编写了一个简单的控制台应用程序来测试它。

public class Program
{
    public static void Main(string[] args)
    {
        var workQueue = new WorkQueue();
        workQueue.StartProducingItems();
        workQueue.StartProcessingItems();

        while (true)
        {

        }
    }
}

public class WorkQueue
{
    private BlockingCollection<int> _queue;
    private static Random _random = new Random();

    public WorkQueue()
    {
        _queue = new BlockingCollection<int>();

        // Prefill some items.
        for (int i = 0; i < 100; i++)
        {
            //_queue.Add(_random.Next());
        }
    }

    public void StartProducingItems()
    {
        Task.Run(() =>
        {
            _queue.Add(_random.Next()); // Should be adding items to the queue constantly, but instead adds one and then nothing else.
        });
    }

    public void StartProcessingItems()
    {
        Task.Run(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker 1: " + item);
            }
        });

        Task.Run(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker 2: " + item);
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,我的设计存在 3 个问题:

  • 我不知道在我的 Main 方法中阻塞/等待的正确方法。执行一个简单的空 while 循环似乎非常低效,并且只是为了确保应用程序不会结束而浪费 CPU 使用率。

  • 我的设计还有另一个问题,在这个简单的应用程序中,我有一个生产者无限期地生产项目,并且永远不会停止。在现实世界的设置中,我希望它最终结束(例如用完要处理的文件)。在这种情况下,我应该如何在 Main 方法中等待它完成?制作StartProducingItems async然后await呢?

  • 无论是GetConsumingEnumerableAdd不工作如我所料。生产者应该不断添加项目,但它添加了一个项目,然后不再添加。这一项然后由消费者之一处理。然后两个消费者都阻止等待添加项目,但没有。我知道这个Take方法,但再次Take在 while 循环中旋转似乎非常浪费和低效。有一种CompleteAdding方法,但是它不允许添加任何其他内容,并且如果您尝试会抛出异常,因此这是不合适的。

我确信两个消费者实际上都在阻塞并等待新项目,因为我可以在调试期间在线程之间切换:

在此处输入图片说明

编辑:

我已经在其中一条评论中提出了建议的更改,但Task.WhenAll仍然立即返回。

public Task StartProcessingItems()
{
    var consumers = new List<Task>();

    for (int i = 0; i < 2; i++)
    {
        consumers.Add(Task.Run(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                Console.WriteLine($"Worker {i}: " + item);
            }
        }));
    }

    return Task.WhenAll(consumers.ToList());
}
Run Code Online (Sandbox Code Playgroud)

mm8*_*mm8 6

GetConsumingEnumerable()正在阻塞。如果你想不断地添加到队列中,你应该把调用_queue.Add放在一个循环中:

public void StartProducingItems()
{
    Task.Run(() =>
    {
        while (true)
            _queue.Add(_random.Next());
    });
}
Run Code Online (Sandbox Code Playgroud)

关于Main()方法,您可以调用该Console.ReadLine()方法以防止主线程在按下键之前完成:

public static void Main(string[] args)
{
    var workQueue = new WorkQueue();
    workQueue.StartProducingItems();
    workQueue.StartProcessingItems();

    Console.WriteLine("Press a key to terminate the application...");
    Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)