如何从rx subscribe回调异步函数?

Mar*_*hke 20 c# asynchronous reactive-programming system.reactive

我想在Rx订阅中回调一个异步函数.

就像这样:

public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}
Run Code Online (Sandbox Code Playgroud)

需要做什么才能正确捕捉异常?

rei*_*erh 30

Paul Betts的回答适用于大多数情况,但是如果你想在等待异步函数完成时阻塞流,你需要这样的东西:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(l => Observable.FromAsync(asyncMethod))
          .Concat()
          .Subscribe();
Run Code Online (Sandbox Code Playgroud)

要么:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
          .Concat()
          .Subscribe();
Run Code Online (Sandbox Code Playgroud)

  • 这是另一个用例,但非常有趣。 (2认同)

Ana*_*tts 19

将其更改为:

Observable.Timer(TimeSpan.FromMilliseconds(100))
    .SelectMany(async _ => await RunAsync())
    .Subscribe();
Run Code Online (Sandbox Code Playgroud)

订阅不会将异步操作保留在Observable中.

  • 这将产生一系列可观察的未完成任务。接下来你要对它们做什么?您的“订阅者”什么也不做。所有任务都在未被观察到的情况下挂起,无法控制异常/终止。这不是一个解决方案。 (4认同)
  • 谢谢Paul的建议。很有意思。你有什么我可以进一步了解的东西吗? (2认同)

Ste*_*ary 14

您不希望传递async方法Subscribe,因为这将创建一个async void方法.尽力避免async void.

在您的情况下,我认为您想要的是为async序列的每个元素调用方法,然后缓存所有结果.在这种情况下,用于为每个元素SelectMany调用async方法,并Replay缓存(加上a Connect来获得滚动):

public class Consumer
{
    private readonly Service _service = new Service();

    public IObservable<string> Trigger()
    {
        var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
            .SelectMany(_ => RunAsync())
            .Replay();
        connectible.Connect();
        return connectible;
    }

    public Task<string> RunAsync()
    {
        return _service.DoAsync();
    }
}
Run Code Online (Sandbox Code Playgroud)

我改变了Results从该Trigger方法返回的属性,我认为这更有意义,所以测试现在看起来像:

[Test]
public async Task Test()
{
    var sut = new Consumer();
    var results = sut.Trigger();
    var result = await results.FirstAsync();
}
Run Code Online (Sandbox Code Playgroud)

  • 该解决方案不保留结果的顺序(以防万一读者试图实现这一点)。要保留顺序,请查看下面的答案。 (2认同)