Observable.FromAsync vs Task.ToObservable

use*_*190 12 .net c# system.reactive

有没有人知道何时使用其中一种方法而不是另一种方法.他们似乎做同样的事情,因为他们转换TPL TaskObservable.

Observable.FromAsync 似乎支持取消令牌,这可能是微妙的差异,如果处置了可观察量,则允许生成任务的方法参与合作取消.

只是想知道我是否遗漏了一些明显的问题,为什么你要使用一个而不是另一个.

谢谢

小智 13

Observable.FromAsync接受一个形式为Func<Task>或的TaskFactory Func<Task<TResult>>,在这种情况下,只有在订阅了observable时才创建和执行任务.

在哪里.ToObservable()需要已经创建(并因此启动)的任务.


Raf*_*ael 11

@Sickboy的回答是正确的.

  • Observable.FromAsync() 将在订阅时启动任务.
  • Task.ToObservable() 需要一个已经运行的任务.

一个用途Observable.FromAsync是控制对异步方法的多次调用的重入.

这是一个这两个方法不等效的例子:

//ob is some IObservable<T>

//ExecuteQueryAsync is some async method
//Here, ExecuteQueryAsync will run **serially**, the second call will start
//only when the first one is already finished. This is an important property
//if ExecuteQueryAsync doesn't support reentrancy
ob
.Select(x => Observable.FromAsync(() => ExecuteQueryAsync(x))
.Concat()
.ObserveOnDispatcher()
.Subscribe(action)
Run Code Online (Sandbox Code Playgroud)

VS

//ob is some IObservable<T>

//ExecuteQueryAsync is some async method
//Even when the `Subscribe` action order will be the same as the first 
//example because of the `Concat`, ExecuteQueryAsync calls could be     
//parallel, the second call to the method could start before the end of the 
//first call. 
.Select(x => ExecuteQueryAsync(x).ToObservable())
.Concat()
.Subscribe(action)
Run Code Online (Sandbox Code Playgroud)

请注意,在第一个示例中,可能需要ObserveOn()或者ObserveOnDispatcher()方法来确保action在原始调度程序上执行,因为Observable.FromAsync它不等待任务,因此在任何可用的调度程序上执行继续操作


Lee*_*ell 5

查看代码,似乎(至少在某些流程中)Observable.FromAsync调用.ToObservable()*.我确信它们应该在语义上等效(假设您传递相同的参数,例如Scheduler,CancellationToken等).

一个更适合链接/流利语法,可以单独阅读更好.无论你喜欢什么样的风格.

*https://github.com/Reactive-Extensions/Rx.NET/blob/859e6159cb07be67fd36b18c2ae2b9a62979cb6d/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs#L727

  • 这是一个很好的观点,我没有考虑过编码风格。它也看起来像 .FromAsync 将任务包装在 Observable.Defer 中,这会使它变冷,我认为 .ToObservable 不会这样做。 (3认同)