将普通 string[] 转换为 Observable<string[]> 并使用 RxJS 5 将其连接到另一个 Observable<string[]>

bal*_*teo 5 rxjs5 angular

我正在尝试将 plain 转换string[]为 aObservable<string[]>并将其连接到现有的Observable<string[]>.

然后我将使用 angular2async管道来显示Observable.

这是我的代码:

import {Injectable} from "angular2/core";
import {Observable} from "rxjs/Observable";
import 'rxjs/Rx';


@Injectable()
export class AppService {

    constructor() {
        console.log('constructor', 'appService');
        this.constructSomeObservable();
    }

    someObservable$:Observable <string[]>;

    constructSomeObservable() {
        this.someObservable$ = Observable.create(observer => {
            const eventSource = new EventSource('/interval-sse-observable');
            eventSource.onmessage = x => observer.next(JSON.parse(x.data));
            eventSource.onerror = x => observer.error(console.log('EventSource failed'));
            return () => {
                eventSource.close();
            };
        });

        this.someObservable$.subscribe(
            theStrings=> {
                console.log(theStrings);
                //Somehow convert the plain array of strings to an observable and concat it to this.someObservable$ observable...
            },
            error=>console.log(error)
        );
    }
}
Run Code Online (Sandbox Code Playgroud)

有人可以帮忙吗?

此外,我想确保当Observable<string[]>重复调用 EventSource 时服务实例不断更新。我的订阅逻辑是否位于正确的位置?

编辑 1:我尝试使用 RxJSconcat运算符,如下所示:

    this.someObservable$.subscribe(
        theStrings=> {
            console.log(theStrings);
            this.someObservable$ = this.someObservable$.concat(Observable.create(theStrings));
        },
        error=>console.log(error)
    );
 }
Run Code Online (Sandbox Code Playgroud)

与 angular2async管道一起:

<ul>
    <li *ngFor="#s of appService.someObservable$ | async">
       a string: {{ s }}
    </li>
</ul>
Run Code Online (Sandbox Code Playgroud)

并且页面上没有显示任何内容;字符串只是显示在控制台上......

我做错了什么?

编辑 2:该应用程序可在 github 上找到

编辑3:我已经考虑了蒂埃里的建议,特别是使用管道async进行订阅以及扫描操作符的使用。

现在唯一剩下的问题是我需要单击路由器链接才能在模板上呈现字符串...模板不会自动更新...

查看github上的项目和相关标签:https://github.com/balteo/demo-angular2-rxjs/tree/36864628/536299

Thi*_*ier 5

我会利用scan操作员来做到这一点。这是一个示例:

@Component({
  selector: 'app'
  template: `
    <div>
      <div *ngFor="#elt of someObservable$  | async">{{elt.name}</div>
    </div>
  `
})
export class App {
  constructor() {
    this.someObservable$ = Observable.create((observer) => {
      const eventSource = new EventSource('/interval-sse-observable');
      eventSource.onmessage = x => observer.next(JSON.parse(x.data));
      eventSource.onerror = x => observer.error(console.log('EventSource failed'));
      return () => {
        eventSource.close();
      };
    })
    .startWith([])
    .scan((acc,value) => acc.concat(value));
  }
}
Run Code Online (Sandbox Code Playgroud)

这是相应的 plunkr:https://plnkr.co/edit/St7LozX3bnOBcoHaG4uM ?p=preview 。

请参阅此问题了解更多详细信息: