在监听消耗IEnumerable <T>时,Blocking.Peek()的类似于BlockingCollection

sll*_*sll 9 c# wcf multithreading .net-4.0 task-parallel-library

我正在使用Pipelines模式实现来将消费者与生产者分离,以避免消费者的慢速问题.

如果在消息处理阶段出现任何异常,[1]它将丢失并且不会分派给其他服务/层[2].如何在[3]这样的消息中处理这样的问题不会丢失,重要的是什么!消息的顺序不会混淆,所以上层服务/层将按照它们进入的顺序获取消息.我有一个想法涉及另一个中间Queue但看起来很复杂?不幸的是BlockingCollection<T>,没有公开任何类比的Queue.Peek()方法,所以我可以只读取下一个可用的消息,并在成功处理的情况下做Dequeue()

private BlockingCollection<IMessage> messagesQueue;    

// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in 
             messagesQueue.GetConsumingEnumerable(cancellation))
{    
    const int maxRetries = 3;
    int retriesCounter = 0;
    bool isSent = false;

    // On this point a message already is removed from messagesQueue
    while (!isSent && retriesCounter++ <= maxRetries)
    {
        try
        {
           // [1] Preprocess a message
           // [2] Dispatch to an other service/layer    
           clientProxyCallback.SendMessage(cachedMessage);
           isSent = true;
        }                                
        catch(Exception exception)
        {
           // [3]   
           // logging
           if (!isSent && retriesCounter < maxRetries)
           {
              Thread.Sleep(NSeconds);
           }
        }            

        if (!isSent && retriesCounter == maxRetries)
        {
           // just log, message is lost on this stage!
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑:忘了说这是IIS托管的WCF服务,它通过客户端回调契约将消息发送回Silverlight客户端WCF代理.

编辑2:以下是我将如何使用Peek(),我错过了什么?

bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}
Run Code Online (Sandbox Code Playgroud)

EDIT3:当然,我可以使用while循环,布尔标志,使用旧风格的方法QueueAutoResetEvent,但我只是想知道是否同样是可能的使用BlockingCollectionGetConsumingEnumerable()我认为像工厂Peek时消耗枚举一起使用,否则所有的管道模式的实现将是非常有益示例新的东西BlockingCollection,GetConsumingEnumerable()看起来不耐用,我不得不回到过去的方法.

Den*_*nis 8

你应该考虑中间队列.

BlockingCollection<T>由于其性质,不能"偷看"物品 - 可能有不止一个消费者.其中一个可以偷看一个项目,另一个可以接受它 - 因此,第一个将尝试采取已经采取的项目.

  • 如果只有一个客户端,可以使用`IEnumerable <T>`的`First()`扩展方法来查看,它不会从阻塞集合中删除一个项目. (5认同)
  • @pvoosten:如果集合中没有任何东西,`First()`将抛出异常。如果您要阻塞直到集合中至少有一个项目(如`Take()`,但不删除元素),该怎么办? (2认同)