如何在redux-observable中重新启动操作?

Арт*_*рик 1 observable redux redux-observable

例如这段代码jsbin的例子

const pingEpic = action$ =>
  action$.ofType(PING)
    .delay(1000) // Asynchronously wait 1000ms then continue
    .mapTo({ type: PONG })
    .takeUntil(action$.ofType(CANCEL));
Run Code Online (Sandbox Code Playgroud)

单击取消按钮后,如果操作为"takeUntil",则表示无效.为什么取消后,延迟1秒钟,并且不再发生变化.

jay*_*lps 8

问题是对RxJS如何工作的微妙但关键的误解 - 但不要害怕,这是非常常见的.

所以举个例子:

const pingEpic = action$ =>
  action$.ofType(PING)
    .delay(1000)
    .mapTo({ type: PONG })
    .takeUntil(action$.ofType(CANCEL));
Run Code Online (Sandbox Code Playgroud)

此史诗的行为可以描述为过滤所有不匹配类型的动作PING.当一个动作匹配时,等待1000ms,然后将该动作映射到另一个动作{ type: PONG },该动作将被发射,然后由redux-observable调度.如果在应用程序运行的任何时候有人调度类型的操作CANCEL,则取消订阅源,这意味着整个链将取消订阅,终止史诗.

如果您必须这样做,看看它的外观可能会有所帮助:

const pingEpic = action$ => {
  return new Rx.Observable(observer => {
    console.log('[pingEpic] subscribe');
    let timer;

    const subscription = action$.subscribe(action => {
      console.log('[pingEpic] received action: ' + action.type);

      // When anyone dispatches CANCEL, we stop listening entirely!
      if (action.type === CANCEL) {
        observer.complete();
        return;
      }

      if (action.type === PING) {
        timer = setTimeout(() => {
          const output = { type: PONG };
          observer.next(output);
        }, 1000);
      }
    });

    return {
      unsubscribe() {
        console.log('[pingEpic] unsubscribe');
        clearTimeout(timer);
        subscription.unsubscribe();
      }
    };
  });
};
Run Code Online (Sandbox Code Playgroud)

您可以在这里使用假商店运行此代码:http://jsbin.com/zeqasih/edit?js,console


相反,您通常想要做的是将想要从顶级链中取消的订阅者链隔离,以便无限期地监听.虽然你的例子(从文档中修改过)是设计的,但我们先来看看它.

在这里,我们使用mergeMap运算符让我们采取匹配的操作并映射到另一个独立的可观察链.

演示:http://jsbin.com/nofato/edit?js,output

const pingEpic = action$ =>
  action$.ofType(PING)
    .mergeMap(() =>
      Observable.timer(1000)
        .takeUntil(action$.ofType(CANCEL))
        .mapTo({ type: PONG })
    );
Run Code Online (Sandbox Code Playgroud)

我们使用Observable.timer等待1000毫秒,然后映射它发出的值(这恰好是数字零,但这在这里并不重要)到我们的PONG操作.我们还说我们想要从定时器源"获取",直到它正常完成或我们收到类型的动作CANCEL.

这会隔离链,因为它mergeMap会继续订阅您返回的observable,直到它出错或完成为止.但是当发生这种情况时,它本身并不会停止订阅您应用它的源; 的action$.ofType(PING)在这个例子中.

一个更现实的例子是在Cancellation部分的redux-observable文档中

在这里我们将.takeUntil()放在我们的.mergeMap()之后,但在我们的AJAX调用之后; 这很重要,因为我们只想取消AJAX请求,而不是阻止Epic监听任何未来的操作.

const fetchUserEpic = action$ =>
  action$.ofType(FETCH_USER)
    .mergeMap(action =>
      ajax.getJSON(`/api/users/${action.payload}`)
        .map(fetchUserFulfilled)
        .takeUntil(action$.ofType(FETCH_USER_CANCELLED))
    );
Run Code Online (Sandbox Code Playgroud)

这一切可能听起来令人困惑,但就像大多数有力的东西一样,一旦你得到它,它就会变得直观.Ben Lesh非常出色地解释了Observables在他最近的演讲中是如何工作的,包括讨论运营商如何成为Observables链,甚至是隔离用户链.即使谈话是在AngularConnect,它也不是Angular特有的.


顺便说一句,重要的是要注意你的史诗不会吞下或以其他方式阻止动作到达减速器,例如当你将传入的动作映射到另一个不同的动作时.事实上,当你的史诗收到一个动作时,它已经通过你的减速器了.将您的史诗视为听取应用程序操作流的边车流程,但不能阻止正常的还原事件发生,它只能发出新的操作.