Dai*_*vas 5 stomp websocket sockjs angular
我正在我的平台中实现 WebSockets 连接。我在前端使用 Angular 10,并在 SockJS Client 1.5.0 上使用 StompJS 2.3.3 来建立与后端的连接。
我创建了一个 WebSockets 服务来管理连接,它正在连接并且工作正常。
这是我创建的服务:
export class WebSocketsService implements OnDestroy {
/**
* The authentication service to get the token from.
*/
private authenticationService: AuthenticationService;
private serverUrl = environment['ceNotificationsServer'];
private socket;
private state: BehaviorSubject<SocketClientState>;
public constructor(authenticationService: AuthenticationService) {
this.authenticationService = authenticationService;
const client_id = environment['ceNotificationsClientId'];
const token = this.authenticationService.getToken();
const url = this.serverUrl + '/websocket' + '?clientId=' + client_id + '&Authorization=' + token;
const ws = new SockJS(url);
this.socket = StompJS.over(ws);
this.socket.reconnect_delay = 5000;
this.socket.debug = () => {
};
this.state = new BehaviorSubject<SocketClientState>(SocketClientState.ATTEMPTING);
this.socket.connect({}, () => {
this.state.next(SocketClientState.CONNECTED);
console.log('Connection to the socket is open.');
});
}
/**
* Establish the connection to the websocket.
*/
connect(cfg: { reconnect: boolean } = {reconnect: false}): Observable<any> {
return new Observable(observer => {
this.state.pipe(
cfg.reconnect ? this.reconnect : o => o,
filter(state => state === SocketClientState.CONNECTED))
.subscribe(() => {
observer.next(this.socket);
});
});
}
message(queue: String): Observable<any> {
return this.connect({reconnect: true})
.pipe(switchMap(client => {
return new Observable<any>(observer => {
client.subscribe(queue, message => {
observer.next(JSON.parse(message.body));
});
});
}),
retryWhen((errors) => errors.pipe(delay(5))));
}
reconnect(observable: Observable<any>): Observable<any> {
return observable.pipe(
retryWhen(errors => errors.pipe(
tap(val => console.log('Trying to reconnect to the socket', val)),
delayWhen(_ => timer(5000))
))
);
}
/**
* Close the connection to the websocket.
*/
close() {
if (this.socket) {
this.socket.complete();
this.state.next(SocketClientState.CLOSED);
console.log('Connection to the socket is closed');
this.socket = null;
}
}
ngOnDestroy() {
this.close();
}
}
export enum SocketClientState {
ATTEMPTING, CONNECTED, CLOSED
}
Run Code Online (Sandbox Code Playgroud)
在我的 Angular 组件中,我添加了以下代码来订阅 WebSockets 队列并获取一些通知来填充我的通知托盘:
const subscription1 = this.websocketsService.message('/messages')
.subscribe(outdatedProfiles => {
const notification: Notification = outdatedProfiles;
notification.message = 'notificationNotSyncedProviders';
this.notifications.addNotification(notification);
});
this.subscriptionsManager.add(subscription1);
Run Code Online (Sandbox Code Playgroud)
我的问题是,当我失去连接(如果 WiFi 断开连接)时,它不会再次重新连接。它捕获错误,但不捕获连接关闭事件。
我尝试了这种方法:
public constructor(authenticationService: AuthenticationService) {
this.socket.connect({}, () => {...});
this.socket.onclose = function(event) {
console.log("WebSocket is closed now.");
this.connect({reconnect: true});
};
}
Run Code Online (Sandbox Code Playgroud)
但这不起作用。我已阅读文档,但似乎无法找到连接关闭时重新连接问题的答案。有任何想法吗?
正如@BizzyBob建议的那样,我认为更好的方法是使用@stomp/ng2-stompjs,它依赖于rx-stomp和stompjs新版本并且可以更轻松地完成任务。
安装@stomp/ng2-stompjs
npm i @stomp/ng2-stompjs
为 rx-stom 服务创建配置文件 (my-rxstomp.config.ts)
export const myRxStompConfig: InjectableRxStompConfig = {
// Which server?
brokerURL: 'ws://127.0.0.1:8080/',
// Wait in milliseconds before attempting auto reconnect
// Set to 0 to disable
// Typical value 500 (500 milli seconds)
reconnectDelay: 200
};
Run Code Online (Sandbox Code Playgroud)
在app.module.ts的providers部分添加RxStomp服务的配置
{
provide: InjectableRxStompConfig,
useValue: myRxStompConfig,
},
{
provide: RxStompService,
useFactory: rxStompServiceFactory,
deps: [InjectableRxStompConfig],
},
Run Code Online (Sandbox Code Playgroud)
];
注入您的组件 RxStomp 服务并订阅连接状态的更改,以便在连接关闭时执行某些操作。如果您只想在连接关闭时重新连接,则无需订阅连接状态,配置字段 reconnectDelay: 200 将为您重新连接。
constructor(private rxStompService: RxStompService) { }
ngOnInit() {
this.rxStompService.connectionState$.subscribe(next => {
console.log('Connection State', RxStompState[next]);
if(next === RxStompState.CLOSED) {
// Do something
}
});
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2447 次 |
| 最近记录: |