如何使用async/await OnNext/OnError/OnCompleted方法实现IObserver?

Kev*_*aft 17 .net c# asynchronous task-parallel-library system.reactive

我正在尝试为System.Net.WebSocket编写一个扩展方法,它将使用Reactive Extensions(Rx.NET)将其转换为IObserver.你可以看到下面的代码:

public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
    // Wrap the web socket in an interface that's a little easier to manage
    var webSocketMessageStream = new WebSocketMessageStream(webSocket);

    // Create the output stream to the client
    return Observer.Create<T>(
        onNext:      async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
        onError:     async error   => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
        onCompleted: async ()      => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
    );
}
Run Code Online (Sandbox Code Playgroud)

这段代码有效,但我担心在onNext,onError和onCompleted lambdas中使用async/await.我知道这会返回一个async void lambda,这是不赞成的(有时候会引起我已经遇到的问题).

我一直在阅读Rx.NET文档以及互联网上的博客文章,我似乎无法找到在IObserver中使用async/await方法的正确方法(如果有的话).有没有正确的方法来做到这一点?如果没有,那么我该如何解决这个问题呢?

Dor*_*rus 17

订阅不采取async方法.所以这里发生的是你正在使用一种即发即忘的机制async void.问题是onNext消息将不再被序列化.

async您应该将其包装到管道中以允许Rx等待它,而不是在Subscribe 中调用方法.

使用fire-and-forget是可以的onError,onCompleted因为这些可以保证是最后一次调用Observable.请记住,与之相关的资源Observable可以在完成之后清理onErroronCompleted返回.

我写了一个小扩展方法来完成所有这些:

    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
    {
        return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
            .Subscribe(
            e => { }, // empty
            onError,
            onCompleted);
    }
Run Code Online (Sandbox Code Playgroud)

首先,我们将async onNext方法转换为Observable桥接两个异步世界.这意味着onNext将尊重async/await inside .接下来,我们将方法包装起来,Defer以便在创建时不会正确启动.相反,Concat只有在前一个完成后才会调用下一个deferred observable.

PS.我希望Rx.Net的下一个版本能够async在订阅中获得支持.

  • Dorus 所说的是这里引入了两层异步。如果收到一条消息,它将调用“WriteMessageAsync”方法。如果收到另一条消息,则无论前一个调用是否已完成,它将执行“WriteMessageAsync”。这意味着您可能会收到无序的消息(特别是如果任务池调度也参与其中)。即 onNext 消息将不再被序列化。 (3认同)
  • 我不认为会发布另一个2.x版本的Rx.我相信Bart和团队目前正在开发v3.0.希望我们能在接下来的几个月/季度看到他们的一些东西. (3认同)