Rxjs - unexpected publishReplay + refCount behaviour after refCount goes to 0

bra*_*dge 6 rxjs rxjs5 rxjs6

My usecase is this: I connect to a service with a websocket and get periodic (but unpredictable) health data from the service. The app may have multiple users of this data stream, so I want to share it. New subscribers should see the most recently emitted health data. I also want to close the websocket when there are no more subscribers.

My app used shareReplay(1) for quite some time, until it was discovered that it leaks the underlying connection (https://blog.strongbrew.io/share-replay-issue/). At which point we changed to pipe(publishReplay(1), refCount). It turns out that this also has a subtly that I did not expect:

  1. Subscriber A connects & the websocket connection is established.
  2. Subscriber B connects and shares correctly, as well as getting the most recent data.
  3. Both A and B disconnect. The websocket is torn down
  4. Subscriber C connects, but only needs one value take(1). The value that is cached by the publishReplay(1) is returned.

In step 4 I really wanted the websocket to be re-created. The cached value is of no use. The timewindow parameter of publishReplay is tempting, but also not quite what I want.

I've managed to find a solution, by using pipe(multicast(() => new ReplaySubject(1)), refCount()), but I don't know Rx well enough to understand the full implications of this.

My question is - what's the best way to achieve the behaviour I want?

Thanks!

Code sample can be seen at https://repl.it/@bradb/MinorColdRouter Inline code

const { Observable, ReplaySubject } = require('rxjs');
const { tap, multicast, take, publishReplay, refCount } = require('rxjs/operators');

const log = console.log;

function eq(a, b) {
  let result = JSON.stringify(a) == JSON.stringify(b);
  if (!result) {
    log('eq failed', a, b);
  }
  return result;
}

function assert(cond, msg) {
  if (!cond) {
    log('****************************************');
    log('Assert failed: ', msg);
    log('****************************************');
  }
}

function delay(t) {
  return new Promise(resolve => {
    setTimeout(resolve, t);
  });
}

let liveCount = 0;

// emitValue 1 happens at 100ms, 2 at 200ms etc
function testSource() {
  return Observable.create(function(observer) {
    let emitValue = 1;
    liveCount++;
    log('create');
    let interv = setInterval(() => {
      log('next --------> ', emitValue);
      observer.next(emitValue);
      emitValue++;
    }, 100);

    return () => {
      liveCount--;
      log('destroy');
      clearInterval(interv);
    };
  });
}

async function doTest(name, o) {
  log('\nDOTEST: ', name);
  assert(liveCount === 0, 'Start off not live');
  let a_vals = [];
  o.pipe(take(4)).subscribe(x => {
    a_vals.push(x);
  });
  await delay(250);
  assert(liveCount === 1, 'Must be alive');

  let b_vals = [];
  o.pipe(take(2)).subscribe(x => {
    b_vals.push(x);
  });
  assert(liveCount === 1, 'Two subscribers, one source');
  await delay(500);
  assert(liveCount === 0, 'source is destroyed');
  assert(eq(a_vals, [1, 2, 3, 4]), 'a vals match');
  assert(eq(b_vals, [2, 3]), 'b vals match');

  let c_vals = [];
  o.pipe(take(2)).subscribe(x => {
    c_vals.push(x);
  });
  assert(liveCount === 1, 'Must be alive');

  await delay(300);
  assert(liveCount === 0, 'Destroyed');
  assert(eq(c_vals, [1, 2]), 'c_vals match');
}

async function main() {
  await doTest(
    'bad: cached value is stale',
    testSource().pipe(
      publishReplay(1),
      refCount()
    )
  );
  await doTest(
    'good: But why is this different to publish replay?',
    testSource().pipe(
      multicast(() => new ReplaySubject(1)),
      refCount()
    )
  );
  await doTest(
    'bad: But why is this different to the above?',
    testSource().pipe(
      multicast(new ReplaySubject(1)),
      refCount()
    )
  );
}
main();
Run Code Online (Sandbox Code Playgroud)

ggr*_*nig 0

改写 cartant 的评论:

\n

publishReplay将在后台使用单个 ReplaySubjectrefCount ,该 ReplaySubject 被取消然后重新订阅。因此它的缓存值会被重放。当您multicast与工厂一起使用时,每次refCount取消订阅都会创建一个新的 ReplaySubject,然后重新订阅 - 因此,没有缓存值。

\n

以下是 cartant 的链接,因为评论中的链接无法访问:

\n\n

来自文章:

\n
\n

多播基础设施\xe2\x80\x99s 主题可以重新订阅。

\n
\n

publishReplaymulticast在幕后使用并且不提供工厂,而是重复使用相同的 ReplaySubject。

\n