Observable.Using with async Task

Sup*_*JMN 6 .net c# system.reactive observable

我已经使用了Observable.Using和返回IDisposable的方法:

Observable.Using(() => new Stream(), s => DoSomething(s));
Run Code Online (Sandbox Code Playgroud)

但是,当异步创建流时,我们将如何进行?像这样:

Observable.Using(async () => await CreateStream(), s => DoSomething(s));

async Task<Stream> CreateStream() 
{
    ...
}

DoSomething(Stream s)
{
    ...
}
Run Code Online (Sandbox Code Playgroud)

这不会编译,因为它说的sTask<Stream>a Stream.

这是怎么回事?

Tay*_*nan 5

让我们来看看异步重载的来源Observable.Using

Observable.FromAsync(resourceFactoryAsync)
    .SelectMany(resource => Observable.Using(() => resource, r =>
        Observable.FromAsync(ct => observableFactoryAsync(r, ct)).Merge()));
Run Code Online (Sandbox Code Playgroud)

知道它只是在后台使用同步版本,您可以执行以下操作以使其适应您的使用:

Observable.FromAsync(CreateStream)
    .SelectMany(stream => Observable.Using(() => stream, DoSomething));
Run Code Online (Sandbox Code Playgroud)

不幸的是,不包括此重载,但您始终可以创建自己的:

public static class ObservableEx
{
    public static IObservable<TSource> Using<TSource, TResource>(
        Func<Task<TResource>> resourceFactoryAsync,
        Func<TResource, IObservable<TSource>> observableFactory)
        where TResource : IDisposable =>
        Observable.FromAsync(resourceFactoryAsync).SelectMany(
            resource => Observable.Using(() => resource, observableFactory));
}
Run Code Online (Sandbox Code Playgroud)

然后就这么简单:

ObservableEx.Using(CreateStream, DoSomething);
Run Code Online (Sandbox Code Playgroud)

所有这些都是假设DoSomething返回一个可观察的,您的问题中没有提到,但Observable.Using.