如果连接断开,如何正确处理客户端流?

Mar*_*ark 6 asp.net-core-signalr system.threading.channels

我正在使用 Microsoft.AspNetCore.SignalR 2.1 v1.0.4,并且使用 v1.0.4 的打字稿客户端使用 ChannelReader 流。

频道显示特定于单个实体的事件数据,因此当客户端的用户导航到该单个实体的页面呈现数据时,预计客户端将订阅频道。如果用户导航到同一页面但针对不同的实体,则客户端将进行另一个订阅调用。

现在我的问题是关于如何最好地取消订阅流,以及一般来说,流的生命周期对于集线器连接停止/启动场景下的客户端来说是什么,以及服务器是否显式中止连接(由于 access_token 超时从而触发客户端刷新他们的连接)?

似乎没有从 api 中浮现出一些连接状态,所以我目前使用 RxJs Subject 将一些连接状态呈现给我的 UI 组件/服务,即当集线器连接的启动调用成功时,我浮现“真”,当onclose 回调称为 I 表面“假”。这允许我尝试在先前订阅的流上调用 dispose 以在连接断开/停止期间清理内容,然后在成功启动调用时再次调用订阅流。

我试过在一个流上调用 dispose 如果集线器已连接,这很好,但如果连接处于断开状态,它会出错。我想知道这是否是一个错误。即使集线器断开连接,我也应该能够处理流吗?

可以只做一个delete streamsubscription然后根据需要重新创建,还是会以任何方式泄漏?

And*_*rse 4

在集线器连接停止/启动场景下,流的生命周期对于客户端来说是多少,以及服务器是否显式中止连接(由于 access_token 超时,从而触发客户端刷新其连接)。

当连接终止时(由于在stop客户端上调用或服务器中止连接),error订阅者的方法将被调用,并显示错误,指示流已因连接终止而终止。一般来说,您应该处理该error方法并将其视为终止事件(即流永远不会产生其他对象)。在服务器上,Context.ConnectionAborted如果连接终止(由任一方),则将触发令牌,并且您可以停止写入流。

如果您已经在使用 RxJS,我强烈建议您构建一个小型包装器,将从 SignalR 返回的对象转换为正确的 RxJS Observable我们返回的对象实际上并不是an Observable,但它具有所有相同的基本方法(subscribe使用completenexterror方法获取对象的方法),因此包装它应该很简单。

我尝试在流上调用 dispose,如果集线器已连接,则没问题,但如果连接处于断开状态,则会出错。我想知道这是否是一个错误。

是的,这可能是一个错误。如果您在集线器断开连接后丢弃,我们不应该扔掉。您可以在https://github.com/aspnet/SignalR上提交该文件吗?要解决这个问题,您可以相当安全地只显示try...catch错误并抑制它(或者如果您偏执的话可以记录它)。

是否可以只删除流订阅,然后根据需要重新创建,或者这会以任何方式泄漏吗?

您应该始终 dispose订阅。如果您只是delete这样做,那么我们无法知道您已完成它,并且我们永远不会告诉服务器停止。如果您致电dispose(并已连接),我们会向服务器发送一条消息“取消”流。在 ASP.NET Core 2.1 中,我们不会向您公开此取消,但我们确实停止从ChannelReader. CancellationToken在 ASP.NET Core 2.2 中,我们允许您在 Hub 方法中接受 a ,并且dispose客户端上的方法将在 Hub 方法中触发此令牌。我强烈建议您尝试 ASP.NET Core 2.2 的最新预览版,并CancellationToken在 Hub 方法中使用 a 来停止流:

public ChannelReader<object> MyStreamingMethod(..., CancellationToken cancellationToken) {
    // pass 'cancellationToken' over to whatever process is writing to the channel
    // and stop writing when the token is triggered
}
Run Code Online (Sandbox Code Playgroud)

注意:如果您这样做,则无需监视Context.ConnectionAborted,传入 Hub 方法的令牌将涵盖所有取消情况。

与此相关的是,您应该始终使用Channel.CreateBounded<T>(size)创建频道。如果您使用无界通道,则更容易泄漏内存,因为编写器可以无限期地继续写入。如果您使用有界通道,如果通道中有未读取的项目(例如,因为客户端已断开连接并且我们已停止读取),则写入器将被停止(WriteAsync并将“阻塞”)。WaitToWriteAsyncsize