如何创建一个Observable序列,在超时后重新发送HTTP请求?

Mar*_*tin 4 .net c# asynchronous system.reactive

我是Rx的新手,我正在尝试创建一个Observable序列,它允许我执行以下操作:

  1. 使用System.Net.Http.HttpClient.SendAsync(request,cancelToken)向URI发送HTTP POST请求
  2. 等待可配置的时间段以返回响应或请求超时.
  3. 如果请求超时,则重复请求.
  4. 继续重复请求,直到收到响应(不一定是200 OK)或最大值.达到了重试次数.
  5. 如果最大 达到重试次数然后我必须知道它,所以我可以记录错误.

我一直在玩:

HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, "Some URI");
...
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;

...

try
{
    var res = Observable.FromAsync(() => _client.SendAsync(request, token))
                                                .Timeout(TimeSpan.FromSeconds(5));
    try
    {
        HttpResponseMessage response = res.Wait();

        // Process response
        ...
    }
    catch (TaskCancelledException)
    {
        ....
    }
}
catch (TimeoutException)
{
    ....
}
Run Code Online (Sandbox Code Playgroud)

但我不确定在超时发生后再次启动请求的最佳方法,以及我应该如何检查我是否已达到最大值.重试次数.

另外,我不确定是否应该对可观察序列设置超时策略,或者我是否应该在HttpClient对象本身上设置Timeout属性.

[2014年12月11日编辑]

根据下面的评论,我更新了@TheZenCoder的代码,以便它使用await

var attempts = 0;
HttpResponseMessage response = null;

try
{
    response = await Observable
        .FromAsync((ct) =>
        {
            attempts++;
            return SendRequest(token, ct);
        })
        .Retry(5)
        .LastAsync();
}
catch (Exception ex)
{
    Console.WriteLine(ex.Message);
}
Run Code Online (Sandbox Code Playgroud)

到目前为止我只进行了有限的测试,但似乎工作正常.

Far*_*ina 6

要发送请求,设置请求超时并使用取消令牌,您可以将其包装到以下方法中:

private static Task<HttpResponseMessage> SendRequest(CancellationToken token)
{
    var client = new HttpClient { Timeout = TimeSpan.FromSeconds(5) };

    var request = new HttpRequestMessage(HttpMethod.Get, new Uri("http://www.google.ba"));

    return client.SendAsync(request, token);
}
Run Code Online (Sandbox Code Playgroud)

我认为在HTTP客户端上设置超时是一个更清晰的选项,而不是使用Observable超时.

然后,您可以使用IObservable Retry方法多次重试此操作.如果你不害怕开源,那么@Xave Sexton在评论中指出RXX扩展中还有一些更灵活的重试方法.

var attempts = 0;

try
{
    var response = Observable
        .FromAsync(() =>
        {
            attempts++;
            return SendRequest(token);
        })
        .Retry(5)
        .Wait();
}
catch (Exception ex)
{
    Console.WriteLine(ex.Message);
}
Run Code Online (Sandbox Code Playgroud)

如果SendRequest方法中发生HTTP超时或其他错误,则重试将继续.在包含有关错误的信息的最后一次重试之后将抛出异常.重试次数设置为attempts变量.

请记住,使用该Wait方法可以有效地阻止调用线程执行,直到结果可用,这不是您在使用异步代码时想要做的事情.也许你有一些特定的场景,这就是我在我的例子中留下它的原因.

  • 另外,我从不建议使用`Wait`方法(我知道你可能有一个测试场景,但是一个自称为新手的Rx开发人员并不一定知道这一点.) (2认同)