如何(以及如果)使用TPL编写单个使用者队列?

Eri*_*ric 16 .net c# queue .net-4.0 task-parallel-library

我最近听说过很多关于.NET 4.0中TPL的播客.他们中的大多数使用任务描述后台活动,如下载图像或进行计算,以便工作不会干扰GUI线程.

我工作的大多数代码都有更多的多生产者/单一消费者风格,其中来自多个来源的工作项必须排队,然后按顺序处理.一个示例是日志记录,其中来自多个线程的日志行被顺序化为单个队列,以便最终写入文件或数据库.来自任何单一来源的所有记录必须保持有序,并且来自同一时刻的记录应该在最终输出中彼此"接近".

因此,多个线程或任务或任何调用队列的任何东西:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}
Run Code Online (Sandbox Code Playgroud)

专用工作线程处理队列:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}
Run Code Online (Sandbox Code Playgroud)

将工作线程专门用于这些任务的消费者方面似乎总是合理的.我应该使用TPL中的某些构造来编写未来的程序吗?哪一个?为什么?

Ade*_*ler 13

您可以使用长时间运行的任务来处理Wilka建议的BlockingCollection中的项目.这是一个非常符合您的应用程序要求的示例.你会看到这样的输出:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C
Run Code Online (Sandbox Code Playgroud)

并非A,B,C&D的输出看起来是随机的,因为它们取决于线程的开始时间,但B总是出现在B1之前.

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}
Run Code Online (Sandbox Code Playgroud)

  • 使用以下帮助减少cpu旋转吗?LogItem item = _queue.Take(canceller.Token); Console.WriteLine(item.Message); (2认同)

Bar*_*rry 5

我知道这个答案大约晚了一年,但请看一下MSDN.

其中显示了如何从TaskScheduler类创建LimitedConcurrencyLevelTask​​Scheduler.通过将并发性限制为单个任务,然后应该按顺序处理您的任务,因为它们通过以下方式排队:

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);

factory.StartNew(()=> 
{
   // your code
});
Run Code Online (Sandbox Code Playgroud)