RxJS:手动发出Observable的正确方法

Pot*_*oes 5 rxjs angular

在Angular 4.x中使用RxJS时,我看到了两种非常不同的模式,用于从用户启动的动作流中生成Observable。一个流是用户单击生成新对象的“添加项目”按钮的直接结果。另一个是由我正在使用的某些第三方代码发出的一系列事件。

我希望能够使用“ combineLatest”之类的东西来组合这两个流以生成单个Observable。

使用我的按钮,我遵循以下模式:

const signal = Observable.create(
            (observer) => {
                this.additem= (item) => observer.next(item);
            }
        );

this.item$ = signal.map((item) => [item])
                            .scan((accumulator, value) => {
                                return accumulator.concat(value);
                            });
Run Code Online (Sandbox Code Playgroud)

但是,我看到很多信息说我应该改用Subjects-我正尝试将其与事件回调一起使用,如下所示:

sort$ = new Subject();

sortChange(sort){
        sort$.next(sort);
}
Run Code Online (Sandbox Code Playgroud)

然后,我尝试将这些合并如下:

combine$ = Observable.combineLatest(sort$, item$,
                  (sort, items) => {
                      return "something that does stuff with these";}
        );
Run Code Online (Sandbox Code Playgroud)

我的问题是-“手动”生成流的首选模式是什么?是否可以/应该像我在这里尝试的那样将可观察对象和主题网格化为一个可观察对象?

mar*_*tin 5

当然,您可以将Observables和Subjects组合成一个流。

我认为这里的问题是,在您的用例中什么才更有意义。从你的描述实施类似“添加项目”功能,我宁愿当SubjectObservable.create

这是因为每次您订阅时,您signal都在重新分配this.additemObservable.create每个观察者都会调用to的回调。请注意,更正确的用法Observable.create如下所示:

const signal = Observable.create((observer) => {
   this.additem = (item) => observer.next(item);
   return () => this.additem = null;
});
Run Code Online (Sandbox Code Playgroud)

() => this.additem = null当您取消订阅此Observable时,将调用返回的回调,这是您应处理所有清理的地方。

但是,如果您订阅了两次,则将signal覆盖this.additem两次,然后选择取消订阅其中一个观察者this.additem = null,则可能会导致意外的行为。

因此,在这种情况下使用更有意义Subject。例如这样:

const subject = new Subject();
this.additem = (item) => subject.next(item);
Run Code Online (Sandbox Code Playgroud)

如果您想查看更多现实生活的示例,Observable.create请看以下示例:使用RxJS和twitter-stream-api模块订阅流

编辑:还可以看看RxJS 5首席开发人员的这些文章: