ReactiveX:Group和Buffer仅为每个组中的最后一项

Dzm*_*rka 6 javascript rxjs reactivex

如何对一个Observable进行分组,并从每个GroupedObservable中仅保留内存中最后一个发出的项目?这样每个组的行为就像BehaviorSubject一样.

像这样的东西:

{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}
Run Code Online (Sandbox Code Playgroud)

所以在内存中我们只有最后一项user:

{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}
Run Code Online (Sandbox Code Playgroud)

当订户订阅时,这两个项目立即发布(每个项目都在其自己的排放中).就像我们每个都有BehaviorSubject一样user.

因为人们可能会永远聊天,所以onCompleted()永远不会被激发.

我事先并不知道user可以有什么价值.

Nik*_*ing 3

我认为您的聊天日志可观察到的内容很热门。#groupBy 发出的 groupObservables 因此也会很热,并且不会在内存中保留任何内容。

要获得您想要的行为(丢弃订阅之前除最后一个值之外的所有内容并从那里继续),您可以使用 ReplaySubject(1)。

如果我错了请纠正我

参见jsbin

var groups = chatlog
      .groupBy(message => message.user)
      .map(groupObservable => {
        var subject = new Rx.ReplaySubject(1);
        groupObservable.subscribe(value => subject.onNext(value));
        return subject;
      });
Run Code Online (Sandbox Code Playgroud)