RxJS:takeUntil具有多个操作和不同的过滤器?

big*_*ato 1 rxjs redux-observable

我有一个Observable,我想继续执行直到:

1)uploadActions.MARK_UPLOAD_AS_COMPLETE用一定的有效载荷调用动作

要么

2)uploadActions.UPLOAD_FAILURE使用任何有效负载调用该动作

这是我所能得到的(并且不起作用):

return Observable.interval(5000)
  .takeUntil(
    action$
      .ofType(
        uploadActions.UPLOAD_FAILURE,
        uploadActions.MARK_UPLOAD_AS_COMPLETE
      )
      .filter(a => { // <---- this filter only applies to uploadActions.MARK_UPLOAD_AS_COMPLETE
        const completedFileHandle = a.payload;
        return handle === completedFileHandle;
      })
  )
  .mergeMap(action =>
    ...
  );
Run Code Online (Sandbox Code Playgroud)

有没有一种干净的方法可以实现这一目标?

byg*_*ace 5

我将两个条件拆分为单独的流,然后像这样合并它们:

const action$ = new Rx.Subject();
const uploadActions = {
  UPLOAD_FAILURE: "UPLOAD_FAILURE",
  MARK_UPLOAD_AS_COMPLETE: "MARK_UPLOAD_AS_COMPLETE"
};
const handle = 42;

window.setTimeout(() => action$.next({
  type: uploadActions.MARK_UPLOAD_AS_COMPLETE,
  payload: handle
}), 1200);

Rx.Observable.interval(500)
  .takeUntil(
    Rx.Observable.merge(
      action$.filter(x => x.type === uploadActions.UPLOAD_FAILURE),
      action$.filter(x => x.type === uploadActions.MARK_UPLOAD_AS_COMPLETE)
      	.filter(x => x.payload === handle)      
    )
  ).subscribe(
    x => { console.log('Next: ', x); },
    e => { console.log('Error: ', e); },
    () => { console.log('Completed'); }
  );
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

对于该示例,我不得不使用filter运算符,而不是ofType因为这ofType是一个redux问题。