如何在Rx中使用异步方法进行订阅?

not*_*row 5 c# system.reactive async-await

我有以下代码:

IObservable<Data> _source;

...

_source.Subscribe(StoreToDatabase);

private async Task StoreToDatabase(Data data) {
    await dbstuff(data);
}
Run Code Online (Sandbox Code Playgroud)

但是,这不编译.有没有办法如何异步观察数据?我试过async void,它有效,但我觉得给定解决方案是不可行的.

我还检查了Reactive Extensions订阅呼叫等待,但它没有回答我的问题(我不关心SelectMany结果.)

Cha*_*ger 11

您不必关心SelectMany结果.答案仍然是相同的......虽然你需要你的任务有一个返回类型(即Task<T>不是Task).

Unit基本上相当于void,所以你可以使用:

_source.SelectMany(StoreToDatabase).Subscribe();

private async Task<Unit> StoreToDatabase(Data data)
{
    await dbstuff(data);
    return Unit.Default;
}
Run Code Online (Sandbox Code Playgroud)

SelectMany重载接受一个Func<TSource, Task<TResult>含义,在任务完成之前,结果序列将无法完成.

  • 但至少在你的订阅方法中放一个`OnError`处理程序.如果你不这样做,并且`_source`错误或'StoreToDatabase`抛出你的应用程序将会崩溃,或者最终处于未知状态. (4认同)

Ben*_*jol 6

答案较晚,但是我认为以下扩展方法可以正确封装Charles Mager在其答案中提出的内容:

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, 
                         Func<Task> asyncAction, Action<Exception> handler = null)
{
    Func<T,Task<Unit>> wrapped = async t =>
    {
        await asyncAction();
        return Unit.Default;
    };
    if(handler == null)
        return source.SelectMany(wrapped).Subscribe(_ => { });
    else
        return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, 
                         Func<T,Task> asyncAction, Action<Exception> handler = null)
{
    Func<T, Task<Unit>> wrapped = async t =>
    {
        await asyncAction(t);
        return Unit.Default;
    };
    if(handler == null)
        return source.SelectMany(wrapped).Subscribe(_ => { });
    else
        return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
Run Code Online (Sandbox Code Playgroud)