rxjs 多个可观察开关

Fil*_*ype 7 rxjs typescript

我有 2 个可观察值,我想实现以下目标:

从Observable1获取一个值,然后忽略Observable1并仅等待来自Observable2 的值,然后同样,再次从Observable1获取值,依此类推。

rxjs 有没有办法实现这一点?操作决策树没有用。我也玩过,switchMap()但这只能完成第一部分

Jon*_*wag 2

我为您创建了一个自定义运算符 ( toggleEmit),它接受N可观察值并在它们之间进行切换。

const result$ = toggleEmit(source1$, source2$);
Run Code Online (Sandbox Code Playgroud)

实现的功能:

  • N可以将可观察量提供给toggleEmit 运算符
  • Observables 将被一一发出并开始重复。方向是从索引0到索引N.length0当最后一个可观察值发出时,它会再次开始
  • Observables 在一次运行中只能发出一次:0 - N.length。多次发射是distincted

仅供参考:代码实际上非常简单。它看起来很大,因为我添加了一些注释以避免混淆。如果您有疑问,请评论,我会尽力回答。

const result$ = toggleEmit(source1$, source2$);
Run Code Online (Sandbox Code Playgroud)
const { Subject, merge } = rxjs;
const { map, scan, distinctUntilChanged, filter } = rxjs.operators;

const source1$ = new Subject();
const source2$ = new Subject();

function toggleEmit(...observables) {
  // amount of all observables
  const amount = observables.length;
  
  // create your updating state that contains the last index and value
  const createState = (value, index) => ({ index, value });

  /*
  * This function updates your state at every emit
  * Keep in mind that updateState contains 3 functions:
  * 1. is called directly: updateState(index, amount)
  * 2. is called by the map operator: map(val => updateState(index, amount)(val)) -> Its just shorthand written
  * 3. is called by the scan operator: fn(state)
  */
  const updateState = (index, amount) => update => state =>
    // Check initial object for being empty and index 0
    Object.keys(state).length == 0 && index == 0
    // Check if new index is one higher
    || index == state.index + 1
    // Check if new index is at 0 and last was at end of observables
    || state.index == amount - 1 && index == 0
      ? createState(update, index)
      : state
  
  // Function is used to avoid same index emit twice
  const noDoubleEmit = (prev, curr) => prev.index == curr.index

  return merge(
    ...observables.map((observable, index) =>
      observable.pipe(map(updateState(index, amount)))
    )
  ).pipe(
    scan((state, fn) => fn(state), {}),
    filter(state => Object.keys(state).length != 0),
    distinctUntilChanged(noDoubleEmit),
    map(state => state.value),
  );
}

const result$ = toggleEmit(source1$, source2$);

result$.subscribe(console.log);

source2$.next(0);
source1$.next(1);
source2$.next(2);
source2$.next(3);
source1$.next(4);
Run Code Online (Sandbox Code Playgroud)