TxSelect和TransactionScope

Dav*_*yon 4 c# transactions rabbitmq

最近,我一直在检查RabbitMQ而不是C#作为实现pub/sub的方法.我更习惯使用NServiceBus.NServiceBus通过在a中登记MSMQ来处理事务TransactionScope.其他事务感知操作也可以同样存在TransactionScope(如MSSQL),因此一切都是真正原子的.在下面,NSB引入MSDTC进行协调.

我看到在RabbitMQ的C#客户端API中有一个IModel.TxSelect()IModel.TxCommit().这很适合在提交之前不向交换发送消息.这包括发送到交换机的多个消息需要是原子的用例.但是,有没有一种方法可以将数据库调用(比如MSSQL)与RabbitMQ事务同步?

小智 13

您可以通过实现IEnlistmentNotification接口编写供MSDTC使用的RabbitMQ资源管理器.该实现在征募参与时为事务管理器提供两阶段提交通知回调.请注意,MSDTC价格昂贵,会严重降低您的整体性能.

RabbitMQ资源管理器的示例:

sealed class RabbitMqResourceManager : IEnlistmentNotification
{
    private readonly IModel _channel;

    public RabbitMqResourceManager(IModel channel, Transaction transaction)
    {
        _channel = channel;
        _channel.TxSelect();
        transaction.EnlistVolatile(this, EnlistmentOptions.None);
    }

    public RabbitMqResourceManager(IModel channel)
    {
        _channel = channel;
        _channel.TxSelect();
        if (Transaction.Current != null)
            Transaction.Current.EnlistVolatile(this, EnlistmentOptions.None);
    }

    public void Commit(Enlistment enlistment)
    {
        _channel.TxCommit();
        enlistment.Done();
    }

    public void InDoubt(Enlistment enlistment)
    {           
        Rollback(enlistment);
    }

    public void Prepare(PreparingEnlistment preparingEnlistment)
    {
        preparingEnlistment.Prepared();
    }

    public void Rollback(Enlistment enlistment)
    {
        _channel.TxRollback();
        enlistment.Done();
    }
}
Run Code Online (Sandbox Code Playgroud)

使用资源管理器的示例

using(TransactionScope trx= new TransactionScope())
{
    var basicProperties = _channel.CreateBasicProperties();
    basicProperties.DeliveryMode = 2;

    new RabbitMqResourceManager(_channel, trx);
    _channel.BasicPublish(someExchange, someQueueName, basicProperties, someData);
    trx.Complete();
}
Run Code Online (Sandbox Code Playgroud)


kzh*_*hen 5

据我所知,无法将 TxSelect/TxCommit 与 TransactionScope 进行协调。

目前,我采用的方法是使用带有持久消息的持久队列来确保它们在 RabbitMQ 重启后仍然存在。然后当从队列中消费时,我读取一条消息做一些处理,然后将一条记录插入到数据库中,一旦所有这些完成,我 ACK(nowledge) 消息并将其从队列中删除。这种方法的潜在问题是消息可能最终被处理两次(例如,如果消息被提交到数据库,但在消息可以被确认之前说到 RabbitMQ 的连接已断开),但对于系统来说我们正在构建我们关注吞吐量。(我相信这被称为“至少一次”方法)。

RabbitMQ 站点确实说使用 TxSelect 和 TxCommit 会显着降低性能,因此我建议对这两种方法进行基准测试。

无论您采用何种方式,您都需要确保您的消费者能够处理可能被处理两次的消息。


如果您还没有找到它,请在此处查看 RabbitMQ 的 .Net 用户指南,特别是第 3.5 节