Kev*_*aft 17 .net c# asynchronous task-parallel-library system.reactive
我正在尝试为System.Net.WebSocket编写一个扩展方法,它将使用Reactive Extensions(Rx.NET)将其转换为IObserver.你可以看到下面的代码:
public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
// Wrap the web socket in an interface that's a little easier to manage
var webSocketMessageStream = new WebSocketMessageStream(webSocket);
// Create the output stream to the client
return Observer.Create<T>(
onNext: async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
onError: async error => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
onCompleted: async () => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
);
}
Run Code Online (Sandbox Code Playgroud)
这段代码有效,但我担心在onNext,onError和onCompleted lambdas中使用async/await.我知道这会返回一个async void lambda,这是不赞成的(有时候会引起我已经遇到的问题).
我一直在阅读Rx.NET文档以及互联网上的博客文章,我似乎无法找到在IObserver中使用async/await方法的正确方法(如果有的话).有没有正确的方法来做到这一点?如果没有,那么我该如何解决这个问题呢?
Dor*_*rus 17
订阅不采取async方法.所以这里发生的是你正在使用一种即发即忘的机制async void.问题是onNext消息将不再被序列化.
async您应该将其包装到管道中以允许Rx等待它,而不是在Subscribe 中调用方法.
使用fire-and-forget是可以的onError,onCompleted因为这些可以保证是最后一次调用Observable.请记住,与之相关的资源Observable可以在完成之后清理onError并onCompleted返回.
我写了一个小扩展方法来完成所有这些:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
{
return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
.Subscribe(
e => { }, // empty
onError,
onCompleted);
}
Run Code Online (Sandbox Code Playgroud)
首先,我们将async onNext方法转换为Observable桥接两个异步世界.这意味着onNext将尊重async/await inside .接下来,我们将方法包装起来,Defer以便在创建时不会正确启动.相反,Concat只有在前一个完成后才会调用下一个deferred observable.
PS.我希望Rx.Net的下一个版本能够async在订阅中获得支持.