Sup*_*JMN 6 .net c# system.reactive observable
我已经使用了Observable.Using和返回IDisposable的方法:
Observable.Using(() => new Stream(), s => DoSomething(s));
Run Code Online (Sandbox Code Playgroud)
但是,当异步创建流时,我们将如何进行?像这样:
Observable.Using(async () => await CreateStream(), s => DoSomething(s));
async Task<Stream> CreateStream()
{
...
}
DoSomething(Stream s)
{
...
}
Run Code Online (Sandbox Code Playgroud)
这不会编译,因为它说的s是Task<Stream>a Stream.
这是怎么回事?
让我们来看看异步重载的来源Observable.Using:
Observable.FromAsync(resourceFactoryAsync)
.SelectMany(resource => Observable.Using(() => resource, r =>
Observable.FromAsync(ct => observableFactoryAsync(r, ct)).Merge()));
Run Code Online (Sandbox Code Playgroud)
知道它只是在后台使用同步版本,您可以执行以下操作以使其适应您的使用:
Observable.FromAsync(CreateStream)
.SelectMany(stream => Observable.Using(() => stream, DoSomething));
Run Code Online (Sandbox Code Playgroud)
不幸的是,不包括此重载,但您始终可以创建自己的:
public static class ObservableEx
{
public static IObservable<TSource> Using<TSource, TResource>(
Func<Task<TResource>> resourceFactoryAsync,
Func<TResource, IObservable<TSource>> observableFactory)
where TResource : IDisposable =>
Observable.FromAsync(resourceFactoryAsync).SelectMany(
resource => Observable.Using(() => resource, observableFactory));
}
Run Code Online (Sandbox Code Playgroud)
然后就这么简单:
ObservableEx.Using(CreateStream, DoSomething);
Run Code Online (Sandbox Code Playgroud)
所有这些都是假设DoSomething返回一个可观察的,您的问题中没有提到,但Observable.Using.