我使用.NET 4.0 BlockingCollection来处理一个项目队列,每个项目都需要由一个操作处理,这个操作最多可能需要一秒钟来处理每个项目.这个项目队列可以由不同的线程添加.
我有几个问题a)允许多个消费者使用这个BlockingCollection?我注意到了GetConsumingEnumerable(),它似乎适用于单个消费者场景.拥有多个消费者的原因是,通过命名管道实例的处理一次最多可以处理三个这样的项目,所以我认为我可以有三个消费者.
b)是否有办法检查项目是否在此队列中,如果是,则让调用者检查是否有项目要阻止,直到该项目被处理完毕?
编辑:
基于Jon Skeet的答案,这里有一些示例代码,用于说明多个消费者对由单个生产者填充的BlockingCollection进行操作,消费者使用GetConsumingEnumerable()
:
static BlockingCollection<string> coll = new BlockingCollection<string>();
static void Consume()
{
foreach (var i in coll.GetConsumingEnumerable())
{
Console.WriteLine(String.Format("Thread {0} Consuming: {1}", Thread.CurrentThread.ManagedThreadId, i));
Thread.Sleep(1000);
}
}
static void Main(string[] args)
{
int item = 0;
Task.Factory.StartNew(() =>
{
while (true)
{
coll.Add(string.Format("Item {0}", item++));
Thread.Sleep(500);
}
});
for (int i = 0; i < 2; i++)
{
Task.Factory.StartNew(() => Consume());
}
while (true) ;
}
Run Code Online (Sandbox Code Playgroud)
在两个不同线程上操作的两个消费者之间以交错的方式处理项目,例如
Thread 4 Consuming: Item 0
Thread 5 Consuming: Item 1
Thread 4 Consuming: Item 2
Thread 5 Consuming: Item 3
Thread 4 Consuming: Item 4
Run Code Online (Sandbox Code Playgroud)
Jon*_*eet 11
多个消费者可以只是打电话Take
或TryTake
同时打电话- 每个消费者只能消费.
但是,我相信GetConsumingEnumerable
也会做你想要的.我相信如果每个调用者都调用它,每个调用者将获得一个单独的消耗可枚举,这将再次确保每个项目只被消耗一次.我不确定当队列变空时会发生什么 - 我不知道是否MoveNext()
阻塞,或者返回false.
我没有真正关注你的第二个问题,但......
GetConsumingEnumerable
事实上,多个消费者同时拨打电话是安全的;仅当集合被标记为完成时,枚举才完成。每个物品只能消耗一次。
GetConsumingEnumerable
本质上相当于:
while (!IsCompleted)
{
if (TryTake(out var item, Timeout.Infinite))
yield return item;
}
Run Code Online (Sandbox Code Playgroud)
加上一些取消/清理逻辑。
归档时间: |
|
查看次数: |
8679 次 |
最近记录: |