Jod*_*ell 12 .net c# concurrency
我想出了一些代码来消耗队列中所有的wating项目.而不是逐项处理项目,将所有等待项目作为一组处理是有意义的.
我已经宣布我的队列是这样的.
private BlockingCollection<Item> items =
new BlockingCollection<Item>(new ConcurrentQueue<Item>);
Run Code Online (Sandbox Code Playgroud)
然后,在消费者线程上,我打算像这样批量阅读这些项目,
Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
var workToDo = new List<Item>();
workToDo.Add(nextItem);
while(this.items.TryTake(out nextItem))
{
workToDo.Add(nextItem);
}
// process workToDo, then go back to the queue.
}
Run Code Online (Sandbox Code Playgroud)
这种方法缺乏实用性,GetConsumingEnumerable我不禁想知道我是否错过了更好的方法,或者我的方法是否存在缺陷.
有没有更好的方法来BlockingCollection<T>批量消费?
一种解决方案是使用BufferBlock<T>来自
System.Threading.Tasks.Dataflow(包含在 .net core 3+ 中)。它不使用GetConsumingEnumerable(),但它仍然允许您使用相同的实用程序,主要是:
还有一个BatchBlock<T>, 但这将您限制为固定大小的批次。
var buffer = new BufferBlock<Item>();
while (await buffer.OutputAvailableAsync())
{
if (buffer.TryReceiveAll(out var items))
//process items
}
Run Code Online (Sandbox Code Playgroud)
这是一个工作示例,它演示了以下内容:
CancellationTokenThread.Sleep()通话不是必需的,但有助于模仿一些会出现在更繁重的情况的处理时间Task.WaitAll()和 theThread.Sleep()都可以选择转换为它们的异步等价物using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
static class Program
{
static void Main()
{
var buffer = new BufferBlock<string>();
// Kick off consumer task(s)
List<Task> consumers = new List<Task>();
for (int i = 0; i < 3; i++)
{
consumers.Add(Task.Factory.StartNew(async () =>
{
// need to copy this due to lambda variable capture
var num = i;
while (await buffer.OutputAvailableAsync())
{
if (buffer.TryReceiveAll(out var items))
Console.WriteLine($"Consumer {num}: " +
items.Aggregate((a, b) => a + ", " + b));
// real life processing would take some time
await Task.Delay(500);
}
Console.WriteLine($"Consumer {num} complete");
}));
// give consumer tasks time to activate for a better demo
Thread.Sleep(100);
}
// Kick off producer task(s)
List<Task> producers = new List<Task>();
for (int i = 0; i < 3; i++)
{
producers.Add(Task.Factory.StartNew(() =>
{
for (int j = 0 + (1000 * i); j < 500 + (1000 * i); j++)
buffer.Post(j.ToString());
}));
// space out the producers for a better demo
Thread.Sleep(10);
}
// may also use the async equivalent
Task.WaitAll(producers.ToArray());
Console.WriteLine("Finished waiting on producers");
// demo being able to complete the collection
buffer.Complete();
// may also use the async equivalent
Task.WaitAll(consumers.ToArray());
Console.WriteLine("Finished waiting on consumers");
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
这是代码的现代化和简化版本。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
private static async Task Main()
{
var buffer = new BufferBlock<string>();
// Kick off consumer task(s)
var consumers = new List<Task>();
for (var i = 0; i < 3; i++)
{
var id = i;
consumers.Add(Task.Run(() => StartConsumer(id, buffer)));
// give consumer tasks time to activate for a better demo
await Task.Delay(100);
}
// Kick off producer task(s)
var producers = new List<Task>();
for (var i = 0; i < 3; i++)
{
var pid = i;
producers.Add(Task.Run(() => StartProducer(pid, buffer)));
// space out the producers for a better demo
await Task.Delay(10);
}
// may also use the async equivalent
await Task.WhenAll(producers);
Console.WriteLine("Finished waiting on producers");
// demo being able to complete the collection
buffer.Complete();
// may also use the async equivalent
await Task.WhenAll(consumers);
Console.WriteLine("Finished waiting on consumers");
Console.ReadLine();
}
private static async Task StartConsumer(
int id,
IReceivableSourceBlock<string> buffer)
{
while (await buffer.OutputAvailableAsync())
{
if (buffer.TryReceiveAll(out var items))
{
Console.WriteLine($"Consumer {id}: " +
items.Aggregate((a, b) => a + ", " + b));
}
// real life processing would take some time
await Task.Delay(500);
}
Console.WriteLine($"Consumer {id} complete");
}
private static Task StartProducer(int pid, ITargetBlock<string> buffer)
{
for (var j = 0 + (1000 * pid); j < 500 + (1000 * pid); j++)
{
buffer.Post(j.ToString());
}
return Task.CompletedTask;
}
}
Run Code Online (Sandbox Code Playgroud)
ConcurrentQueue<T>虽然在某些方面并不那么好,但我自己的方法允许使用AtomicDequeueAllLLQueue<T>方法进行批量出队,其中当前队列中的所有项目都通过单个(原子和线程安全)操作从中取出,然后处于非-供单个线程使用的线程安全集合。该方法是专门针对需要批量读取操作的场景而设计的。
虽然这不是阻塞的,但它可以用来很容易地创建阻塞集合:
public BlockingBatchedQueue<T>
{
private readonly AutoResetEvent _are = new AutoResetEvent(false);
private readonly LLQueue<T> _store;
public void Add(T item)
{
_store.Enqueue(item);
_are.Set();
}
public IEnumerable<T> Take()
{
_are.WaitOne();
return _store.AtomicDequeueAll();
}
public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
{
if(_are.WaitOne(millisecTimeout))
{
items = _store.AtomicDequeueAll();
return true;
}
items = null;
return false;
}
}
Run Code Online (Sandbox Code Playgroud)
这是一个不执行以下操作的起点:
所有这些都可以添加,但我想将一些实际用途保持在最低限度,希望在上面定义的限制内不会出现错误。
| 归档时间: |
|
| 查看次数: |
2249 次 |
| 最近记录: |