Ash*_*hok 4 websocket rxjs angular5
我使用'rxjs-websockets'来连接websockets.但在一定时间(约2分钟)后,连接关闭.在手动关闭之前,如何保持连接.
这是我使用的代码片段
constructor(private socketService: WebSocketService) {}
this.socketService.connect('/endpoint');
this.socketSubscription = this.socketService.messages
.subscribe(result => {
// perform operation
});
Run Code Online (Sandbox Code Playgroud)
这是WebSocketService
import {Injectable} from '@angular/core';
import {QueueingSubject} from 'queueing-subject';
import {Observable} from 'rxjs/Observable';
import websocketConnect from 'rxjs-websockets';
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/retryWhen';
import 'rxjs/add/operator/delay';
@Injectable()
export class WebSocketService {
private inputStream: QueueingSubject<string>;
public messages: Observable<string>;
constructor() {
}
public connect(socketUrl) {
this.messages = websocketConnect(
socketUrl,
this.inputStream = new QueueingSubject<string>()
).messages.retryWhen(errors => errors.delay(1000))
.map(message => JSON.parse(message))
.share();
}
public send(message: string): void {
this.inputStream.next(message);
}
}
Run Code Online (Sandbox Code Playgroud)
Websockets通常在一些消息交换的帮助下长时间保持连接.在我们的例子中,我们可以将其称为'ping => pong',客户端发送消息'ping',服务器可以响应消息'pong'.
您可以按如下方式每30秒发送一次ping.
setInterval(() => {
this.socketService.send('ping');
}, 30000);
Run Code Online (Sandbox Code Playgroud)
当您将在WebSocketService接收的每条消息转换为JSON时,您必须进行这些更改以获得JSON解析错误.
export class WebSocketService {
.
.
.
public connect(socketUrl) {
this.messages = websocketConnect(
socketUrl,
this.inputStream = new QueueingSubject<string>()
).messages.retryWhen(errors => errors.delay(1000))
//parse messages except pong to avoid JSON parsing error
.map(message => message === 'pong' ? message : JSON.parse(message))
.share();
}
.
.
.
}
Run Code Online (Sandbox Code Playgroud)