转换异步方法以返回IObservable <>

Mat*_*rts 8 c# system.reactive async-await

我有一个async方法是一个长时间运行的方法,它读取一个流,当它发现一些事件触发事件时:

public static async void GetStream(int id, CancellationToken token)
Run Code Online (Sandbox Code Playgroud)

它需要一个取消令牌,因为它是在新任务中创建的.在内部,它await在读取流时调用:

var result = await sr.ReadLineAsync()
Run Code Online (Sandbox Code Playgroud)

现在,我想将此转换为返回IObservable <>的方法,以便我可以将其与反应式扩展一起使用.从我所读到的,最好的方法是使用Observable.Create,因为RX 2.0现在也支持异步我可以使用这样的东西:

public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
    return Observable.Create<Message>(
        async (IObserver<Message> observer) =>
            {
Run Code Online (Sandbox Code Playgroud)

内部的其余代码是相同的,但不是触发我正在调用的事件observer.OnNext().但是,这感觉不对.有一件事我在那里混合了CancellationTokens,虽然添加了async关键字使它工作,这实际上是最好的事情吗?我正在调用我的ObservableStream:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));
Run Code Online (Sandbox Code Playgroud)

Bra*_*don 12

你是对的.一旦您通过a表示您的界面IObservable,您应该避免要求呼叫者提供CancellationToken.这并不意味着你不能在内部使用它们.Rx提供了几种机制来生成CancellationToken实例,当观察者从您的observable取消订阅时,这些实例被取消.

有很多方法可以解决您的问题.最简单的几乎不需要更改代码.如果调用者取消订阅,它会使用重载为Observable.Create您提供CancellationToken触发:

public static IObservable<Message> ObservableStream(int id)
{
    return Observable.Create<Message>(async (observer, token) =>
    {
         // no exception handling required.  If this method throws,
         // Rx will catch it and call observer.OnError() for us.
         using (var stream = /*...open your stream...*/)
         {
             string msg;
             while ((msg = await stream.ReadLineAsync()) != null)
             {
                 if (token.IsCancellationRequested) { return; }
                 observer.OnNext(msg);
             }
             observer.OnCompleted();
         }
    });
}
Run Code Online (Sandbox Code Playgroud)


Ric*_*ein 1

您应该更改 GetStream 以返回任务,而不是 void(返回 async void 不好,除非绝对需要,正如 svick 评论的那样)。返回任务后,您只需调用 .ToObservable() 即可完成。

例如:

public static async Task<int> GetStream(int id, CancellationToken token) { ... }
Run Code Online (Sandbox Code Playgroud)

然后,

GetStream(1, new CancellationToken(false))
   .ToObservable()
   .Subscribe(Console.Write);
Run Code Online (Sandbox Code Playgroud)