sll*_*sll 9 c# wcf multithreading .net-4.0 task-parallel-library
我正在使用Pipelines模式实现来将消费者与生产者分离,以避免消费者的慢速问题.
如果在消息处理阶段出现任何异常,[1]它将丢失并且不会分派给其他服务/层[2].如何在[3]这样的消息中处理这样的问题不会丢失,重要的是什么!消息的顺序不会混淆,所以上层服务/层将按照它们进入的顺序获取消息.我有一个想法涉及另一个中间Queue但看起来很复杂?不幸的是BlockingCollection<T>,没有公开任何类比的Queue.Peek()方法,所以我可以只读取下一个可用的消息,并在成功处理的情况下做Dequeue()
private BlockingCollection<IMessage> messagesQueue;
// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in
messagesQueue.GetConsumingEnumerable(cancellation))
{
const int maxRetries = 3;
int retriesCounter = 0;
bool isSent = false;
// On this point a message already is removed from messagesQueue
while (!isSent && retriesCounter++ <= maxRetries)
{
try
{
// [1] Preprocess a message
// [2] Dispatch to an other service/layer
clientProxyCallback.SendMessage(cachedMessage);
isSent = true;
}
catch(Exception exception)
{
// [3]
// logging
if (!isSent && retriesCounter < maxRetries)
{
Thread.Sleep(NSeconds);
}
}
if (!isSent && retriesCounter == maxRetries)
{
// just log, message is lost on this stage!
}
}
}
Run Code Online (Sandbox Code Playgroud)
编辑:忘了说这是IIS托管的WCF服务,它通过客户端回调契约将消息发送回Silverlight客户端WCF代理.
编辑2:以下是我将如何使用Peek(),我错过了什么?
bool successfullySent = true;
try
{
var item = queue.Peek();
PreProcessItem(item);
SendItem(item);
}
catch(Exception exception)
{
successfullySent = false;
}
finally
{
if (successfullySent)
{
// just remove already sent item from the queue
queue.Dequeue();
}
}
Run Code Online (Sandbox Code Playgroud)
EDIT3:当然,我可以使用while循环,布尔标志,使用旧风格的方法Queue和AutoResetEvent,但我只是想知道是否同样是可能的使用BlockingCollection和 GetConsumingEnumerable()我认为像工厂Peek时消耗枚举一起使用,否则所有的管道模式的实现将是非常有益示例新的东西BlockingCollection,GetConsumingEnumerable()看起来不耐用,我不得不回到过去的方法.
你应该考虑中间队列.
BlockingCollection<T>由于其性质,不能"偷看"物品 - 可能有不止一个消费者.其中一个可以偷看一个项目,另一个可以接受它 - 因此,第一个将尝试采取已经采取的项目.
| 归档时间: |
|
| 查看次数: |
5717 次 |
| 最近记录: |