请参阅下面的伪代码
//Single or multiple Producers produce using below method
void Produce(object itemToQueue)
{
concurrentQueue.enqueue(itemToQueue);
consumerSignal.set;
}
//somewhere else we have started a consumer like this
//we have only one consumer
void StartConsumer()
{
while (!concurrentQueue.IsEmpty())
{
if (concurrentQueue.TrydeQueue(out item))
{
//long running processing of item
}
}
consumerSignal.WaitOne();
}
Run Code Online (Sandbox Code Playgroud)
我如何移植这种模式,我从远古时代就开始使用taskfactory创建的任务和net 4的新信号功能.换句话说,如果有人用net 4编写这个模式,它会是什么样子?伪代码很好.如你所见,我已经使用.net 4 concurrentQueue了.如何使用任务并可能使用一些新的信令机制.谢谢
通过Jon/Dan解决我的问题.甜.没有手动信号或while(true)或while(itemstoProcess)类型循环像过去那样
//Single or multiple Producers produce using below method
void Produce(object itemToQueue)
{
blockingCollection.add(item);
}
//somewhere else we have started a consumer like this
//this supports …Run Code Online (Sandbox Code Playgroud) 我有一个线程添加项目到BlockingCollection.
在我正在使用的另一个线程上
foreach (var item in myCollection.GetConsumingEnumerable())
如果有问题我想要摆脱我的foreach和我的方法并清除剩下的东西,BlockingCollection但是我找不到办法去做.
有任何想法吗?
在下面的代码中,我使用CancellationToken在生产者没有生成时唤醒GetConsumingEnumerable(),我想要脱离foreach并退出Task.但我没有看到IsCancellationRequested被记录,我的Task.Wait(timeOut)等待整个timeOut期间.我究竟做错了什么?
userToken.Task = Task.Factory.StartNew(state =>
{
userToken.CancelToken = new CancellationTokenSource();
foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
{
if (userToken.CancelToken.IsCancellationRequested)
{
Log.Write("BroadcastQueue IsCancellationRequested");
break;
...
}
}
return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);
Run Code Online (Sandbox Code Playgroud)
后来...
UserToken.CancelToken.Cancel();
try
{
task.Wait(timeOut);
}
catch (AggregateException ar)
{
Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}
Run Code Online (Sandbox Code Playgroud)