相关疑难解决方法(0)

使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式

请参阅下面的伪代码

//Single or multiple Producers produce using below method
    void Produce(object itemToQueue)
    {
        concurrentQueue.enqueue(itemToQueue);
        consumerSignal.set;
    }

    //somewhere else we have started a consumer like this
    //we have only one consumer
    void StartConsumer()
    {
        while (!concurrentQueue.IsEmpty())
        {
            if (concurrentQueue.TrydeQueue(out item))
            {
                //long running processing of item
            }
        }
        consumerSignal.WaitOne();
    }
Run Code Online (Sandbox Code Playgroud)

我如何移植这种模式,我从远古时代就开始使用taskfactory创建的任务和net 4的新信号功能.换句话说,如果有人用net 4编写这个模式,它会是什么样子?伪代码很好.如你所见,我已经使用.net 4 concurrentQueue了.如何使用任务并可能使用一些新的信令机制.谢谢

通过Jon/Dan解决我的问题.甜.没有手动信号或while(true)或while(itemstoProcess)类型循环像过去那样

//Single or multiple Producers produce using below method
 void Produce(object itemToQueue)
 {
     blockingCollection.add(item);
 }

 //somewhere else we have started a consumer like this
 //this supports …
Run Code Online (Sandbox Code Playgroud)

c# multithreading scheduled-tasks pfx task-parallel-library

19
推荐指数
2
解决办法
3万
查看次数

如何清空BlockingCollection

我有一个线程添加项目到BlockingCollection.

在我正在使用的另一个线程上 foreach (var item in myCollection.GetConsumingEnumerable())

如果有问题我想要摆脱我的foreach和我的方法并清除剩下的东西,BlockingCollection但是我找不到办法去做.

有任何想法吗?

.net c# .net-4.0 task-parallel-library c#-4.0

15
推荐指数
4
解决办法
2万
查看次数

如何在BlockingCollection上取消GetConsumingEnumerable()

在下面的代码中,我使用CancellationToken在生产者没有生成时唤醒GetConsumingEnumerable(),我想要脱离foreach并退出Task.但我没有看到IsCancellationRequested被记录,我的Task.Wait(timeOut)等待整个timeOut期间.我究竟做错了什么?

userToken.Task = Task.Factory.StartNew(state =>
{
    userToken.CancelToken = new CancellationTokenSource();

    foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
    {
        if (userToken.CancelToken.IsCancellationRequested)
        {
            Log.Write("BroadcastQueue IsCancellationRequested");
            break;
            ...
        }
    }

    return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);
Run Code Online (Sandbox Code Playgroud)

后来...

UserToken.CancelToken.Cancel();          
try
{
    task.Wait(timeOut);
}
catch (AggregateException ar)
{
    Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
    Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}
Run Code Online (Sandbox Code Playgroud)

c# cancellation blockingcollection

8
推荐指数
2
解决办法
7919
查看次数