jop*_*opa 5 .net c# message-queue rebus
我们使用 Rebus 作为 Sql 服务器的队列系统。对于不同类型的消息,我们有多个收件人。每条消息都可以由特定类型的多个工作人员处理。一条消息只能由一名工人处理/处理(第一个拉出它的工人)。如果工作人员由于某种原因无法完成它,它会使用超时服务推迟消息。
如果我理解正确,它就会变成一个 TimeoutRequest 并放入超时表中。当需要重新运行时,它会在作为原始消息重新引入队列之前变成 TimeoutReply。
我们遇到的问题是,当它变成 TimeoutReply 时,所有工作人员都会拿起它并创建原始消息。当超时时,一条原始消息变成多条消息(与工作人员一样多)。
我们的 Rebus 设置如下:
“服务器端”:
var adapter = new BuiltinContainerAdapter();
Configure.With(adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServerInOneWayClientMode(connectionString).EnsureTableIsCreated())
.CreateBus()
.Start();
return adapter;
Run Code Online (Sandbox Code Playgroud)
“工人方面”:
_adapter = new BuiltinContainerAdapter();
Configure.With(_adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServer(_connectionString, _inputQueue, "error")
.EnsureTableIsCreated())
.Events(x => x.AfterMessage += ((bus, exception, message) => SendWorkerFinishedJob(exception, message)))
.Events(x => x.BeforeMessage += (bus, message) => SignalWorkerStartedJob(message))
.Behavior(x => x.SetMaxRetriesFor<Exception>(0))
.Timeouts(x => x.StoreInSqlServer(_connectionString, "timeouts").EnsureTableIsCreated())
.CreateBus().Start(numberOfWorkers);
Run Code Online (Sandbox Code Playgroud)
非常感谢在解决问题或提供理解方面的任何帮助!
我可以想象为什么你最终会得到多个超时回复的唯一原因是因为每个工作人员都充当超时管理器,并且他们似乎共享相同的存储。
这样,由于超时管理器在查询到期超时时不使用任何类型的锁定或任何内容,因此它们最终可能会抢占相同的到期超时,这反过来会导致多个超时回复 - 存在竞争条件,但它会发生不会被注意到,因为此 SQL不会注意到行是否被实际删除)。
我建议您a)为工作人员使用单独的超时表(例如_inputQueue + ".timeouts"),或者b)让所有工作人员使用外部超时管理器(即通过省略该事物Timeouts(x => ...)并启动独立的专用超时管理器。
在你的场景中,我想(a)是最简单的方法,因为它非常接近你现在所拥有的。
不过,我确实更喜欢 (b) 自己,通常每台托管 Rebus 端点的机器都有一个超时管理器。
如果这能解决您的问题,请告诉我。
另外,我很想知道 SQL 传输是如何为您服务的:)
| 归档时间: |
|
| 查看次数: |
1042 次 |
| 最近记录: |