Mar*_*hke 20 c# asynchronous reactive-programming system.reactive
我想在Rx订阅中回调一个异步函数.
就像这样:
public class Consumer
{
private readonly Service _service = new Service();
public ReplaySubject<string> Results = new ReplaySubject<string>();
public void Trigger()
{
Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
}
public Task RunAsync()
{
return _service.DoAsync();
}
}
public class Service
{
public async Task<string> DoAsync()
{
return await Task.Run(() => Do());
}
private static string Do()
{
Thread.Sleep(TimeSpan.FromMilliseconds(200));
throw new ArgumentException("invalid!");
return "foobar";
}
}
[Test]
public async Task Test()
{
var sut = new Consumer();
sut.Trigger();
var result = await sut.Results.FirstAsync();
}
Run Code Online (Sandbox Code Playgroud)
需要做什么才能正确捕捉异常?
rei*_*erh 30
Paul Betts的回答适用于大多数情况,但是如果你想在等待异步函数完成时阻塞流,你需要这样的东西:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(l => Observable.FromAsync(asyncMethod))
.Concat()
.Subscribe();
Run Code Online (Sandbox Code Playgroud)
要么:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
.Concat()
.Subscribe();
Run Code Online (Sandbox Code Playgroud)
Ana*_*tts 19
将其更改为:
Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(async _ => await RunAsync())
.Subscribe();
Run Code Online (Sandbox Code Playgroud)
订阅不会将异步操作保留在Observable中.
Ste*_*ary 14
您不希望传递async方法Subscribe,因为这将创建一个async void方法.尽力避免async void.
在您的情况下,我认为您想要的是为async序列的每个元素调用方法,然后缓存所有结果.在这种情况下,用于为每个元素SelectMany调用async方法,并Replay缓存(加上a Connect来获得滚动):
public class Consumer
{
private readonly Service _service = new Service();
public IObservable<string> Trigger()
{
var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(_ => RunAsync())
.Replay();
connectible.Connect();
return connectible;
}
public Task<string> RunAsync()
{
return _service.DoAsync();
}
}
Run Code Online (Sandbox Code Playgroud)
我改变了Results从该Trigger方法返回的属性,我认为这更有意义,所以测试现在看起来像:
[Test]
public async Task Test()
{
var sut = new Consumer();
var results = sut.Trigger();
var result = await results.FirstAsync();
}
Run Code Online (Sandbox Code Playgroud)