使用Reactive Extensions重试异步任务代码

Geo*_*kos 4 c# system.reactive reactiveui

在我的数据访问类中包含以下代码.

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            using (var connection = Connection)
            {
                var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                Task<IEnumerable<TEntity>> queryTask =
                    connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                        commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                connection.Close();
                connection.Dispose();
                tokenSource.Dispose();
                return data;
            }
        }
Run Code Online (Sandbox Code Playgroud)

我想SqlExeption抛出一次重试一次.请记住,我不能将RX应用于应用程序,而只能在此代码块中应用.

我尝试了下面的代码,看起来它正在正确执行并Do登录控制台输出,但并没有真正调用Catch处理程序,我不确定Retry处理程序是否也被执行.

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            return await Observable.Defer(async () =>
            {
                using (var connection = Connection)
                {
                    var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                    Task<IEnumerable<TEntity>> queryTask =
                        connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                            commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                    IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                    connection.Close();
                    connection.Dispose();
                    tokenSource.Dispose();
                    return Observable.Return(data);
                }
            })
            .Catch<IEnumerable<TEntity>, SqlException>(source =>
           {
               Debug.WriteLine($"QueryAsync Exception {source}");
               return Observable.Return(new List<TEntity>());
           })
           .Throttle(TimeSpan.FromMilliseconds(500))
           .Retry(1)
           .Do(_ => Debug.WriteLine("Do QueryAsync"));
        }
Run Code Online (Sandbox Code Playgroud)

Ken*_*art 6

我可以看到你的代码有几个潜在的问题:

  • QueryWithRetryAsync例如调用的方法中将重试逻辑与主逻辑分开.这只是一个设计问题,但仍然存在问题
  • 不要Catch直到之后Retry.否则,SqlException将导致一个空列表,Retry操作员将永远不会看到异常
  • 我认为这根本不是Throttle必要的,因为你只想通过管道获得一个价值
  • Retry(1)没有做你认为它做的事情(这对我来说也是一个惊喜).看来"重试"的定义包括第一次调用,所以你需要Retry(2)

这是一个独立的示例,其行为方式符合您的要求:

class Program
{
    static void Main(string[] args)
    {
        var pipeline = Observable
            .Defer(() => DoSomethingAsync().ToObservable())
            .Retry(2)
            .Catch<string, InvalidOperationException>(ex => Observable.Return("default"));

        pipeline
            .Do(Console.WriteLine)
            .Subscribe();

        Console.ReadKey();
    }

    private static int invocationCount = 0;

    private static async Task<string> DoSomethingAsync()
    {
        Console.WriteLine("Attempting DoSomethingAsync");

        await Task.Delay(TimeSpan.FromSeconds(2));

        ++invocationCount;

        if (invocationCount == 2)
        {
            return "foo";
        }

        throw new InvalidOperationException();
    }
}
Run Code Online (Sandbox Code Playgroud)