有没有办法捕获 websocket 连接中的关闭事件?

Dai*_*vas 5 stomp websocket sockjs angular

我正在我的平台中实现 WebSockets 连接。我在前端使用 Angular 10,并在 SockJS Client 1.5.0 上使用 StompJS 2.3.3 来建立与后端的连接。

我创建了一个 WebSockets 服务来管理连接,它正在连接并且工作正常。

这是我创建的服务:

export class WebSocketsService implements OnDestroy {
    /**
    * The authentication service to get the token from.
    */
    private authenticationService: AuthenticationService;

    private serverUrl = environment['ceNotificationsServer'];
    private socket;
    private state: BehaviorSubject<SocketClientState>;

    public constructor(authenticationService: AuthenticationService) {
    this.authenticationService = authenticationService;

    const client_id = environment['ceNotificationsClientId'];
    const token = this.authenticationService.getToken();

    const url = this.serverUrl + '/websocket' + '?clientId=' + client_id + '&Authorization=' + token;
    const ws = new SockJS(url);
    this.socket = StompJS.over(ws);
    this.socket.reconnect_delay = 5000;
    this.socket.debug = () => {
    };
    this.state = new BehaviorSubject<SocketClientState>(SocketClientState.ATTEMPTING);

    this.socket.connect({}, () => {
        this.state.next(SocketClientState.CONNECTED);
        console.log('Connection to the socket is open.');
    });
    }

    /**
    * Establish the connection to the websocket.
    */
    connect(cfg: { reconnect: boolean } = {reconnect: false}): Observable<any> {
    return new Observable(observer => {
        this.state.pipe(
        cfg.reconnect ? this.reconnect : o => o,
        filter(state => state === SocketClientState.CONNECTED))
        .subscribe(() => {
            observer.next(this.socket);
        });
    });
    }

    message(queue: String): Observable<any> {
    return this.connect({reconnect: true})
        .pipe(switchMap(client => {
            return new Observable<any>(observer => {
            client.subscribe(queue, message => {
                observer.next(JSON.parse(message.body));
            });
            });
        }),
        retryWhen((errors) => errors.pipe(delay(5))));
    }

    reconnect(observable: Observable<any>): Observable<any> {
    return observable.pipe(
        retryWhen(errors => errors.pipe(
        tap(val => console.log('Trying to reconnect to the socket', val)),
        delayWhen(_ => timer(5000))
        ))
    );
    }

    /**
    * Close the connection to the websocket.
    */
    close() {
    if (this.socket) {
        this.socket.complete();
        this.state.next(SocketClientState.CLOSED);
        console.log('Connection to the socket is closed');
        this.socket = null;
    }
    }

    ngOnDestroy() {
    this.close();
    }
}

export enum SocketClientState {
    ATTEMPTING, CONNECTED, CLOSED
}
Run Code Online (Sandbox Code Playgroud)

在我的 Angular 组件中,我添加了以下代码来订阅 WebSockets 队列并获取一些通知来填充我的通知托盘:

const subscription1 = this.websocketsService.message('/messages')
    .subscribe(outdatedProfiles => {
        const notification: Notification = outdatedProfiles;
        notification.message = 'notificationNotSyncedProviders';
        this.notifications.addNotification(notification);
    });
this.subscriptionsManager.add(subscription1);
Run Code Online (Sandbox Code Playgroud)

我的问题是,当我失去连接(如果 WiFi 断开连接)时,它不会再次重新连接。它捕获错误,但不捕获连接关闭事件。

我尝试了这种方法:

public constructor(authenticationService: AuthenticationService) {
    this.socket.connect({}, () => {...});
    this.socket.onclose = function(event) {
        console.log("WebSocket is closed now.");
        this.connect({reconnect: true});
    };
}
Run Code Online (Sandbox Code Playgroud)

但这不起作用。我已阅读文档,但似乎无法找到连接关闭时重新连接问题的答案。有任何想法吗?

Jor*_*bat 0

正如@BizzyBob建议的那样,我认为更好的方法是使用@stomp/ng2-stompjs,它依赖于rx-stomp和stompjs新版本并且可以更轻松地完成任务。

  1. 安装@stomp/ng2-stompjs

    npm i @stomp/ng2-stompjs

  2. 为 rx-stom 服务创建配置文件 (my-rxstomp.config.ts)

    
    export const myRxStompConfig: InjectableRxStompConfig = {
      // Which server?
      brokerURL: 'ws://127.0.0.1:8080/',
      // Wait in milliseconds before attempting auto reconnect
      // Set to 0 to disable
      // Typical value 500 (500 milli seconds)
      reconnectDelay: 200
    };
    
    
    Run Code Online (Sandbox Code Playgroud)
  3. 在app.module.ts的providers部分添加RxStomp服务的配置

     {
       provide: InjectableRxStompConfig,
       useValue: myRxStompConfig,
     },
     {
       provide: RxStompService,
       useFactory: rxStompServiceFactory,
       deps: [InjectableRxStompConfig],
     },
    
    Run Code Online (Sandbox Code Playgroud)

];

  1. 注入您的组件 RxStomp 服务并订阅连接状态的更改,以便在连接关闭时执行某些操作。如果您只想在连接关闭时重新连接,则无需订阅连接状态,配置字段 reconnectDelay: 200 将为您重新连接。

    constructor(private rxStompService: RxStompService) { }
    
    ngOnInit() {
      this.rxStompService.connectionState$.subscribe(next => {
        console.log('Connection State', RxStompState[next]);
        if(next === RxStompState.CLOSED) {
          // Do something
        }
      });
    }
    
    Run Code Online (Sandbox Code Playgroud)