Bra*_*ram 5 components websocket rxjs angular2-services angular
我正在使用angular 2构建一个Web应用程序,其中我希望有多个组件监听同一个服务.此服务返回一个observable,它返回来自websocket的传入数据.我根据这个例子编写了代码.
当前的问题是: 数据从主组件通过服务发送到服务器(使用websockets)并返回数据.但是,只有home.component中的观察者被调用(具有id:room.created和data),而不是导航栏中的观察者.
有人能告诉我为什么两个都不叫?我也尝试将消息$ .subscribe添加到app.component但无济于事.
现在,让我们来看看代码.
返回可观察的消息服务.组件使用此服务发送和接收消息.
@Injectable()
export class MessageService {
private _messages: Rx.Subject<Message>;
messages$: Rx.Observable<Message>;
constructor(wsService: SocketService, private configuration: Configuration) {
console.log('messag eservice');
this._messages = <Rx.Subject<Message>>wsService
.connect()
.map((response: MessageEvent): Message => {
let data = JSON.parse(response.data);
return {
id: data.id,
data: data.data,
}
});
this.messages$ = this._messages.asObservable();
}
public send(message: Message): void {
this._messages.next(message);
}
}
Run Code Online (Sandbox Code Playgroud)
一种套接字服务,它创建一个websocket连接并将自身绑定到此套接字的输入和输出.
import { Injectable } from '@angular/core';
import * as Rx from "rxjs/Rx";
import { Configuration } from '../app.constants';
@Injectable()
export class SocketService {
private subject: Rx.Subject<MessageEvent>;
constructor(private configuration: Configuration){};
public connect(wsNamespace = ''): Rx.Subject<MessageEvent> {
var url = this.configuration.wsUrl + wsNamespace;
if(!this.subject) {
this.subject = this.create(url);
}
return this.subject;
}
private create(url): Rx.Subject<MessageEvent> {
let ws = new WebSocket(url);
// bind ws events to observable (streams)
let observable = Rx.Observable.create((obs: Rx.Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
ws.onerror = obs.error.bind(obs);
ws.onclose = obs.complete.bind(obs);
return ws.close.bind(ws);
});
// on obs next (send something in the stream) send it using ws.
let observer = {
next: (data: Object) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
},
};
return Rx.Subject.create(observer, observable);
}
}
Run Code Online (Sandbox Code Playgroud)
具有以下提供者的app组件:
providers: [MessageService, SocketService, Configuration, AuthService]
Run Code Online (Sandbox Code Playgroud)
我在我的主app.component中实例化提供程序,以确保消息和套接字服务没有实例化两次.
我的home.component看起来像这样(这是一个使用路由加载的页面):
import { Component, OnInit } from '@angular/core';
import { Subject } from 'rxjs';
import { Router } from '@angular/router';
import { MessageService } from '../../services/message.service';
@Component({
...
providers: []
})
export class HomeComponent implements OnInit {
constructor(private router: Router, private messageService: MessageService) {}
ngOnInit(): void {
this.messageService.send({
id: 'room.create',
data: {'name': 'Blaat'}
});
this.messageService.messages$.subscribe(msg => {
console.log(msg);
if(msg.id == 'room.created') {
// navigate naar games!
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
我的navbar组件看起来像这样(指令):
import { Component, OnInit } from '@angular/core';
import { MessageService } from '../../services/message.service';
@Component({
moduleId: module.id,
selector: 'navbar',
templateUrl: 'navbar.component.html',
styleUrls: ['navbar.component.css']
})
export class Navbar implements OnInit {
constructor(private messageService: MessageService) { }
ngOnInit() {
this.messageService.messages$.subscribe(msg => {
console.log(msg);
if(msg.id == 'room.created') {
// navigate naar games!
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
看来你的可观察创建函数被多次调用,最可能是两个组件=>两个subscriptions =>两个可观察的创建函数调用.所以最新的observable create fn会覆盖以前的可观察回调到websocket onmessage,onerror和onclose.您应该多播底层的observable以防止这种情况(共享运算符应该这样做).
// bind ws events to observable (streams)
let observable = Rx.Observable.create((obs: Rx.Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
ws.onerror = obs.error.bind(obs);
ws.onclose = obs.complete.bind(obs);
return ws.close.bind(ws);
}).share();
Run Code Online (Sandbox Code Playgroud)
如何正确执行此操作的更有用的资源 https://github.com/ReactiveX/rxjs/blob/master/src/observable/dom/WebSocketSubject.ts https://github.com/blesh/RxSocketSubject
| 归档时间: |
|
| 查看次数: |
5830 次 |
| 最近记录: |