我很好奇MassTransit消费者是否可以在实际检索msg之前Peek()MSMQ队列.
步骤/过程是什么:
1)Msg发送到队列
2)消费者得到它并且必须进行数据库更新 - 大约需要5秒钟
3)如果第一轮更新,消费者必须进行第二轮更新.
我的问题是,我如何处理如果第一个数据库更新失败,消息留在队列中的情况(即网络问题,无法进入数据库).
目前,只要它从队列中读取消息,就会将其删除,然后如果数据库更新失败,它就会消失.
另外,我如何处理电源故障 - 我的意思是如果消费者通过'工作'的一半,无论是(db update或其他什么)和电源芯片等,我如何重新运行该过程队列中的消息?让我们说这个工作(无论如何我当前的实例)正在推动一个新的行到一个表.我的意思是我可以编写代码来首先检查行是否在那里然后是否删除消息然后如果没有然后运行任务,但我怎么能让它重新运行整个过程呢?
我已经读过我可以Peek()排队,然后运行任务,然后读取队列消息真实并删除它,但我不能为我的生活弄清楚,如果这适用于公共交通...有点丢失...
另外我知道Masstransit有,.RetryLater但我在这个过程中使用它吗?是 Initially- > When- > Then - > .RetryLater在传奇?
任何指针都会被指定
最诚挚的问候罗宾
编辑
PS:我正在使用传奇....
Define(() =>
{
RemoveWhen(saga => saga.CurrentState == Completed);
Initially(
When(NewAC)
.Then((saga, message) => saga.ProcessPSM(message),
InCaseOf<Exception>()
.TransitionTo(Problem)
)
.Then((saga, message) => saga.PostProcessPSM())
.Complete()
);
During(Problem,
When(Waiting)
// NOTE: THIS DOES NOT WORK!!!!
.RetryLater()
);
});
Run Code Online (Sandbox Code Playgroud)
RetryLater抛出一个错误:"现有的传奇不能接受该消息"
我不知道我还能如何访问'RetryLater'.
MassTransit抽象出底层队列的概念.所以Peek不是解决方案,但它确实有其他方法可以重试消息.如果您只对处理错误和故障条件感兴趣,则以下机制就足够了.
默认情况下,如果使用者抛出异常,则将重试该消息N次:
如果您想要一种更细粒度的错误处理方法,您可以实现Context Consumer,捕获可恢复或瞬态异常并手动调用RetryLater.根据我的理解,这可以完成多少次没有限制.
public class RetryConsumer : Consumes<AwesomeMessage>.Context
{
public void Consume(IConsumeContext<AwesomeMessage> message)
{
try
{
Console.WriteLine("This is Attempt " + message.RetryCount);
// Do Something
}
catch (SomeTransientException e)
{
message.RetryLater();
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1654 次 |
| 最近记录: |