Bry*_*dds 2 c# masstransit rabbitmq
我有一个有三种状态的传奇; Initial,ReceivingRows,已完成 -
public static State Initial { get; set; }
public static State ReceivingRows { get; set; }
public static State Completed { get; set; }
Run Code Online (Sandbox Code Playgroud)
当它获得BofMessage(其中Bof =文件的开头)时,它从Initial转换为ReceivingRows.在BofMessage之后,它接收大量的RowMessages,其中每个RowMessages描述一个平面文件中的一行.发送完所有RowMessage后,将发送EofMessage,状态将更改为Completed.观察 -
static void DefineSagaBehavior()
{
Initially(When(ReceivedBof)
.Then((saga, message) => saga.BeginFile(message))
.TransitionTo(ReceivingRows));
During(ReceivingRows, When(ReceivedRow)
.Then((saga, message) => saga.AddRow(message)));
During(ReceivingRows, When(ReceivedRowError)
.Then((saga, message) => saga.RowError(message)));
During(ReceivingRows, When(ReceivedEof)
.Then((saga, message) => saga.EndFile(message))
.TransitionTo(Completed));
}
Run Code Online (Sandbox Code Playgroud)
这有效,除了有时在BofMessage 之前收到几个RowMessage!这与我发送它们的顺序无关.这意味着将收到消息并最终将其视为错误,从而导致它们从我最终写出来的数据库或文件中丢失.
作为临时修复,我在这个方法中添加了一个小睡眠计时器黑客,它可以完成所有的发布 -
public static void Publish(
[NotNull] IServiceBus serviceBus,
[NotNull] string publisherName,
Guid correlationId,
[NotNull] Tuple<string, string> inputFileDescriptor,
[NotNull] string outputFileName)
{
// attempt to load offsets
var offsetsResult = OffsetParser.Parse(inputFileDescriptor.Item1);
if (offsetsResult.Result != ParseOffsetsResult.Success)
{
// publish an offsets invalid message
serviceBus.Publish<TErrorMessage>(CombGuid.Generate(), publisherName, inputFileDescriptor.Item2);
return;
}
// publish beginning of file
var fullInputFilePath = Path.GetFullPath(inputFileDescriptor.Item2);
serviceBus.Publish<TBofMessage>(correlationId, publisherName, fullInputFilePath);
// HACK: make sure bof message happens before row messages, or else some row messages won't be received
Thread.Sleep(5000);
// publish rows from feed
var feedResult = FeedParser.Parse(inputFileDescriptor.Item2, offsetsResult.Offsets);
foreach (var row in feedResult)
{
// publish row message, unaligned if applicable
if (row.Result != ParseRowResult.Success)
serviceBus.Publish<TRowErrorMessage>(correlationId, publisherName, row.Fields);
else
serviceBus.Publish<TRowMessage>(correlationId, publisherName, row.Fields);
}
// publish end of file
serviceBus.Publish<TEofMessage>(correlationId, publisherName, outputFileName);
}
Run Code Online (Sandbox Code Playgroud)
这是一个5秒的睡眠计时器,并且是一个非常难看的黑客.任何人都可以告诉我为什么我没有按照我发送给他们的顺序收到邮件?如果这些消息在默认情况下无序,是否可以确保以正确的顺序发送这些消息?
谢谢!
请注意,这是从http://groups.google.com/group/masstransit-discuss/browse_thread/thread/7bd9518a690db4bb交叉发布,以方便使用.
您无法确保以任何顺序传递邮件.你可以通过确保消费者方面只有一个并发消费者来接近MT,我仍然不会依赖这种行为(http://docs.masstransit-project.com/en/latest/overview/keyideas.html#处理程序).这将有效地使您的消费者单线程化.
| 归档时间: |
|
| 查看次数: |
2123 次 |
| 最近记录: |