Rx.Subject是一个热门观察者吗?

c c*_*c c 2 rxjs rxjs5

代码

const a = new Rx.Subject().do(x => console.log('a'))
const b = a.mapTo(0)
const c = a.mapTo(1)
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'))
a.next(3)
Run Code Online (Sandbox Code Playgroud)

和输出

a
d
a
d
Run Code Online (Sandbox Code Playgroud)

为什么打印两次?是不是Rx.Subject热门观察?

Bha*_*tel 10

您需要了解冷/热可观察对象和主题。

一个寒冷的可观察是可观察到的是重新执行其订阅处理每一个它的订阅时间:

const cold = new Observable(function subscribe(observer) {
  console.log('subscribed');
  observer.next(Math.random());
  observer.complete();
});
// > subscribed
// sub 1: 0.1231231231231
cold.subscribe((num) => console.log('sub 1:', num));
// > subscribed
// sub 2: 0.09805969045
cold.subscribe((num) => console.log('sub 2:', num));
Run Code Online (Sandbox Code Playgroud)

热可观测可观察到一个源(冷或以其他方式),其具有源极和订户之间的受试者。当订阅一个热点 Observable 时,订阅会在内部透明地路由到内部的 Subject,Subject 订阅源 Observable。这确保源 Observable 只有一个订阅者(主题),并且主题与许多订阅者共享源的值:

const cold = new Observable(function subscribe(observer) {
  console.log('subscribed');
  observer.next(Math.random());
  observer.complete();
});

const hot = cold.publish();
hot.subscribe((num) => console.log('sub 1:', num));
hot.subscribe((num) => console.log('sub 2:', num));
hot.connect(); // <-- this subscribes the inner Subject to the cold source
// > subscribed
// > sub 1: 0.249848935489
// > sub 2: 0.249848935489
Run Code Online (Sandbox Code Playgroud)

您可以通过多播使 Observable 变得热,这需要一个函数,该函数返回连接时要使用的主题。为了方便起见,也有多播的变体(例如发布),可以创建特定类型的主题。publish()是一种方便的方法multicast(() => new Subject())

除了connect()将内部 Subject 订阅到源并返回底层 Subscription 之外,您还可以调用refCount(),它返回一个 Observable。返回的ObservablerefCount()被订阅一次后,connect()内部会自动调用,后续的订阅不会重连。当所有订阅者都取消订阅时,refCount会自动从源取消订阅内部主题。share()是一种方便的方法source.publish().refCount()

所以,它会起作用,

const a = new Rx.Subject().do(x => console.log('a')).share();
const b = a.mapTo(0);
const c = a.mapTo(1);
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'));
a.next(3);
Run Code Online (Sandbox Code Playgroud)


ols*_*lsn 7

Subject本身很热/共享.

但是:您附加的任何(大多数!)运算符将创建一个新流,前一个流(在本例中为Subject)作为源 - 然而,新流(对于大多数运算符)不热并且只会变热通过附加热运算符(如share或等publish...)来获取热流

所以当你share的时候do,一切都应该按预期工作.

const a = new Rx.Subject().do(x => console.log('a')).share();
const b = a.mapTo(0);
const c = a.mapTo(1);
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'));
a.next(3);
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)