Dav*_*yon 4 c# transactions rabbitmq
最近,我一直在检查RabbitMQ而不是C#作为实现pub/sub的方法.我更习惯使用NServiceBus.NServiceBus通过在a中登记MSMQ来处理事务TransactionScope.其他事务感知操作也可以同样存在TransactionScope(如MSSQL),因此一切都是真正原子的.在下面,NSB引入MSDTC进行协调.
我看到在RabbitMQ的C#客户端API中有一个IModel.TxSelect()和IModel.TxCommit().这很适合在提交之前不向交换发送消息.这包括发送到交换机的多个消息需要是原子的用例.但是,有没有一种方法可以将数据库调用(比如MSSQL)与RabbitMQ事务同步?
小智 13
您可以通过实现IEnlistmentNotification接口编写供MSDTC使用的RabbitMQ资源管理器.该实现在征募参与时为事务管理器提供两阶段提交通知回调.请注意,MSDTC价格昂贵,会严重降低您的整体性能.
RabbitMQ资源管理器的示例:
sealed class RabbitMqResourceManager : IEnlistmentNotification
{
private readonly IModel _channel;
public RabbitMqResourceManager(IModel channel, Transaction transaction)
{
_channel = channel;
_channel.TxSelect();
transaction.EnlistVolatile(this, EnlistmentOptions.None);
}
public RabbitMqResourceManager(IModel channel)
{
_channel = channel;
_channel.TxSelect();
if (Transaction.Current != null)
Transaction.Current.EnlistVolatile(this, EnlistmentOptions.None);
}
public void Commit(Enlistment enlistment)
{
_channel.TxCommit();
enlistment.Done();
}
public void InDoubt(Enlistment enlistment)
{
Rollback(enlistment);
}
public void Prepare(PreparingEnlistment preparingEnlistment)
{
preparingEnlistment.Prepared();
}
public void Rollback(Enlistment enlistment)
{
_channel.TxRollback();
enlistment.Done();
}
}
Run Code Online (Sandbox Code Playgroud)
使用资源管理器的示例
using(TransactionScope trx= new TransactionScope())
{
var basicProperties = _channel.CreateBasicProperties();
basicProperties.DeliveryMode = 2;
new RabbitMqResourceManager(_channel, trx);
_channel.BasicPublish(someExchange, someQueueName, basicProperties, someData);
trx.Complete();
}
Run Code Online (Sandbox Code Playgroud)
据我所知,无法将 TxSelect/TxCommit 与 TransactionScope 进行协调。
目前,我采用的方法是使用带有持久消息的持久队列来确保它们在 RabbitMQ 重启后仍然存在。然后当从队列中消费时,我读取一条消息做一些处理,然后将一条记录插入到数据库中,一旦所有这些完成,我 ACK(nowledge) 消息并将其从队列中删除。这种方法的潜在问题是消息可能最终被处理两次(例如,如果消息被提交到数据库,但在消息可以被确认之前说到 RabbitMQ 的连接已断开),但对于系统来说我们正在构建我们关注吞吐量。(我相信这被称为“至少一次”方法)。
RabbitMQ 站点确实说使用 TxSelect 和 TxCommit 会显着降低性能,因此我建议对这两种方法进行基准测试。
无论您采用何种方式,您都需要确保您的消费者能够处理可能被处理两次的消息。
如果您还没有找到它,请在此处查看 RabbitMQ 的 .Net 用户指南,特别是第 3.5 节