RxJs - 多个订阅者等待同一承诺结果

mys*_*mic 5 reactive-programming rxjs rxjs5

如果给予新解决方案的迟到者已经在飞行中,如何让多个订阅者等待相同的承诺来解决?

doSomething = () => {
  return new Promise((resolve) => {
     setTimeout(() => resolve(Math.random(), 1000)
  })
}

// how to define obs?

obs.subscribe(v => console.log(v)); // 0.39458743297857473
obs.subscribe(v => console.log(v)); // 0.39458743297857473
obs.subscribe(v => console.log(v)); // 0.39458743297857473

setTimeout(() => obs.subscribe(v => console.log(v)), 2000); // 0.9485769395265746
Run Code Online (Sandbox Code Playgroud)

我希望可观察量在第一个订阅者之前保持冷状态,然后在结果流式传输到所有后续并发订阅者后再次变冷。我基本上不希望对同一底层函数有任何并发​​请求。

ols*_*lsn 4

您可以用作defer创建运算符,然后share用作流:

doSomething = () => {
  return new Promise((resolve) => {
     setTimeout(() => resolve(Math.random(), 1000));
  });
}

const obs = Rx.Observable
    .defer(doSomething)
    .share();

obs.subscribe(console.log); // resolve #1
obs.subscribe(console.log); // resolve #1
obs.subscribe(console.log); // resolve #1

setTimeout(() => obs.subscribe(console.log), 2000); // resolve #2
Run Code Online (Sandbox Code Playgroud)
doSomething = () => {
  return new Promise((resolve) => {
     setTimeout(() => resolve(Math.random(), 1000));
  });
}

const obs = Rx.Observable
    .defer(doSomething)
    .share();

obs.subscribe(console.log); // resolve #1
obs.subscribe(console.log); // resolve #1
obs.subscribe(console.log); // resolve #1

setTimeout(() => obs.subscribe(console.log), 2000); // resolve #2
Run Code Online (Sandbox Code Playgroud)