如何让NHibernate在每个请求使用会话时重试死锁事务?

Hen*_*rik 12 nhibernate deadlock rollback livelock

当您使用Session-Per-Request模式时,您在使用NHibernate的3层应用程序中使用哪种模式/体系结构需要支持事务失败的重试?(因为ISession在异常后变为无效,即使这是死锁或超时或活锁异常).

Hen*_*rik 34

注意2现在我永远不会将写入事务放在Web项目中 - 而是使用消息传递+队列并让后台工作人员处理消息,以便完成事务性工作.

但是,我仍然会使用事务进行读取以获得一致的数据; 与WebCC项目中的MVCC/Snapshot隔离一起使用.在这种情况下,您会发现每个事务请求的会话完全正常.

注1:这篇文章的想法已被放置在Castle Transactions框架和我的新NHibernate Facility中.

好的,这是一般的想法.假设您要为客户创建未最终订单.您有某种GUI,例如浏览器/ MVC应用程序,它使用相关信息创建新的数据结构(或者您从网络获得此数据结构):

[Serializable]
class CreateOrder /*: IMessage*/
{
    // immutable
    private readonly string _CustomerName;
    private readonly decimal _Total;
    private readonly Guid _CustomerId;

    public CreateOrder(string customerName, decimal total, Guid customerId)
    {
        _CustomerName = customerName;
        _Total = total;
        _CustomerId = customerId;
    }

    // put ProtoBuf attribute
    public string CustomerName
    {
        get { return _CustomerName; }
    }

    // put ProtoBuf attribute
    public decimal Total
    {
        get { return _Total; }
    }

    // put ProtoBuf attribute
    public Guid CustomerId
    {
        get { return _CustomerId; }
    }
}
Run Code Online (Sandbox Code Playgroud)

你需要一些东西来处理它.可能这将是某种服务总线中的命令处理程序."命令处理程序"这个词是其中之一,您可以将其称为"服务"或"域服务"或"消息处理程序".如果您正在进行函数式编程,那么它将是您的消息框实现,或者如果您正在执行Erlang或Akka,它将是一个Actor.

class CreateOrderHandler : IHandle<CreateOrder>
{
    public void Handle(CreateOrder command)
    {
        With.Policy(IoC.Resolve<ISession>, s => s.BeginTransaction(), s =>
        {
            var potentialCustomer = s.Get<PotentialCustomer>(command.CustomerId);
            potentialCustomer.CreateOrder(command.Total);
            return potentialCustomer;
        }, RetryPolicies.ExponentialBackOff.RetryOnLivelockAndDeadlock(3));
    }
}

interface IHandle<T> /* where T : IMessage */
{
    void Handle(T command);
}
Run Code Online (Sandbox Code Playgroud)

上面显示了您可能为此给定问题域(应用程序状态/事务处理)选择的API用法.

With的实现:

static class With
{
    internal static void Policy(Func<ISession> getSession,
                                       Func<ISession, ITransaction> getTransaction,
                                       Func<ISession, EntityBase /* abstract 'entity' base class */> executeAction,
                                       IRetryPolicy policy)
    {
        //http://fabiomaulo.blogspot.com/2009/06/improving-ado-exception-management-in.html

        while (true)
        {
            using (var session = getSession())
            using (var t = getTransaction(session))
            {
                var entity = executeAction(session);
                try
                {
                    // we might not always want to update; have another level of indirection if you wish
                    session.Update(entity);
                    t.Commit();
                    break; // we're done, stop looping
                }
                catch (ADOException e)
                {
                    // need to clear 2nd level cache, or we'll get 'entity associated with another ISession'-exception

                    // but the session is now broken in all other regards will will throw exceptions
                    // if you prod it in any other way
                    session.Evict(entity);

                    if (!t.WasRolledBack) t.Rollback(); // will back our transaction

                    // this would need to be through another level of indirection if you support more databases
                    var dbException = ADOExceptionHelper.ExtractDbException(e) as SqlException;

                    if (policy.PerformRetry(dbException)) continue;
                    throw; // otherwise, we stop by throwing the exception back up the layers
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如您所见,我们需要一个新的工作单元; 每当出现问题时,ISession.这就是循环位于Using语句/块外部的原因.具有函数等同于具有工厂实例,除了我们直接在对象实例上调用,而不是在其上调用方法.它使得更好的调用者-API imho.

我们希望相当平滑地处理我们如何执行重试,因此我们有一个可以由不同处理程序实现的接口,称为IRetryHandler.应该可以为每个方面链接这些(是的,它非常接近AOP)你想要强制执行控制流程.与AOP的工作方式类似,返回值用于控制控制流,但仅以真/假方式控制,这是我们的要求.

interface IRetryPolicy
{
    bool PerformRetry(SqlException ex);
}
Run Code Online (Sandbox Code Playgroud)

AggregateRoot,PotentialCustomer是具有生命周期的实体.这是你用*.hbm.xml文件/ FluentNHibernate映射的内容.

它有一个与发送命令1:1对应的方法.这使命令处理程序完全显而易见.

此外,使用带有duck typing的动态语言,它允许您将命令的类型名称映射到方法,类似于Ruby/Smalltalk的工作方式.

如果您正在进行事件源,事务处理将类似,除了事务不会连接NHibernate之类的.推论是您将保存通过调用CreateOrder(十进制)创建的事件,并为您的实体提供从存储重新读取已保存事件的机制.

最后需要注意的是,我正在重写我创建的三种方法.这是NHibernate方面的要求,因为它需要一种知道实体何时与另一个实体相等的方式,如果它们在集合/包中.更多关于我在这里的实施.无论如何,这是示例代码,我现在不关心我的客户,所以我没有实现它们:

sealed class PotentialCustomer : EntityBase
{
    public void CreateOrder(decimal total)
    {
        // validate total
        // run business rules

        // create event, save into event sourced queue as transient event
        // update private state
    }

    public override bool IsTransient() { throw new NotImplementedException(); }
    protected override int GetTransientHashCode() { throw new NotImplementedException(); }
    protected override int GetNonTransientHashCode() { throw new NotImplementedException(); }
}
Run Code Online (Sandbox Code Playgroud)

我们需要一种创建重试策略的方法.当然,我们可以通过多种方式做到这一点.在这里,我将一个流畅的接口与静态方法类型相同类型的同一对象的实例组合在一起.我明确地实现了接口,以便在Fluent接口中看不到其他方法.此界面仅使用下面的"示例"实现.

internal class RetryPolicies : INonConfiguredPolicy
{
    private readonly IRetryPolicy _Policy;

    private RetryPolicies(IRetryPolicy policy)
    {
        if (policy == null) throw new ArgumentNullException("policy");
        _Policy = policy;
    }

    public static readonly INonConfiguredPolicy ExponentialBackOff =
        new RetryPolicies(new ExponentialBackOffPolicy(TimeSpan.FromMilliseconds(200)));

    IRetryPolicy INonConfiguredPolicy.RetryOnLivelockAndDeadlock(int retries)
    {
        return new ChainingPolicy(new[] {new SqlServerRetryPolicy(retries), _Policy});
    }
}
Run Code Online (Sandbox Code Playgroud)

我们需要一个接口来部分完成对Fluent接口的调用.这为我们提供了类型安全性.因此,在完成策略配置之前,我们需要两个解除引用运算符(即'完全停止' - (.)),远离我们的静态类型.

internal interface INonConfiguredPolicy
{
    IRetryPolicy RetryOnLivelockAndDeadlock(int retries);
}
Run Code Online (Sandbox Code Playgroud)

链接政策可以解决.它的实现检查它的所有子节点是否继续返回,并在检查时,它还执行其中的逻辑.

internal class ChainingPolicy : IRetryPolicy
{
    private readonly IEnumerable<IRetryPolicy> _Policies;

    public ChainingPolicy(IEnumerable<IRetryPolicy> policies)
    {
        if (policies == null) throw new ArgumentNullException("policies");
        _Policies = policies;
    }

    public bool PerformRetry(SqlException ex)
    {
        return _Policies.Aggregate(true, (val, policy) => val && policy.PerformRetry(ex));
    }
}
Run Code Online (Sandbox Code Playgroud)

此策略允许当前线程休眠一段时间; 有时数据库超载,并且有多个读者/写入者不断尝试阅读将是对数据库的事实上的DOS攻击(看看几个月前facebook崩溃时发生的事情,因为他们的缓存服务器都在同一时间查询他们的数据库时间).

internal class ExponentialBackOffPolicy : IRetryPolicy
{
    private readonly TimeSpan _MaxWait;
    private TimeSpan _CurrentWait = TimeSpan.Zero; // initially, don't wait

    public ExponentialBackOffPolicy(TimeSpan maxWait)
    {
        _MaxWait = maxWait;
    }

    public bool PerformRetry(SqlException ex)
    {
        Thread.Sleep(_CurrentWait);
        _CurrentWait = _CurrentWait == TimeSpan.Zero ? TimeSpan.FromMilliseconds(20) : _CurrentWait + _CurrentWait;
        return _CurrentWait <= _MaxWait;
    }
}
Run Code Online (Sandbox Code Playgroud)

同样,在任何好的基于SQL的系统中,我们都需要处理死锁.我们无法真正计划这些,特别是在使用NHibernate时,除了保持严格的事务策略 - 没有隐式事务; 并且要小心Open-Session-In-View.还有笛卡尔积问题/ N + 1选择了你需要记住的问题,如果你要获取大量数据.相反,您可能拥有Multi-Query或HQL的'fetch'关键字.

internal class SqlServerRetryPolicy : IRetryPolicy
{
    private int _Tries;
    private readonly int _CutOffPoint;

    public SqlServerRetryPolicy(int cutOffPoint)
    {
        if (cutOffPoint < 1) throw new ArgumentOutOfRangeException("cutOffPoint");
        _CutOffPoint = cutOffPoint;
    }

    public bool PerformRetry(SqlException ex)
    {
        if (ex == null) throw new ArgumentNullException("ex");
        // checks the ErrorCode property on the SqlException
        return SqlServerExceptions.IsThisADeadlock(ex) && ++_Tries < _CutOffPoint;
    }
}
Run Code Online (Sandbox Code Playgroud)

帮助类使代码更好地读取.

internal static class SqlServerExceptions
{
    public static bool IsThisADeadlock(SqlException realException)
    {
        return realException.ErrorCode == 1205;
    }
}
Run Code Online (Sandbox Code Playgroud)

不要忘记在IConnectionFactory中处理网络故障(通过实现IConnection委托).


PS:如果您不仅仅是在阅读,那么每次请求会话就是一个破碎的模式.特别是如果您正在使用与您一起编写的相同的ISession进行读取,并且您没有在写入之前对读取进行排序,使得它们始终都是.