使用redux-observable并订阅websocket

Jor*_*ald 5 redux-observable

试图弄清楚如何获得我的史诗将会订阅websocket,然后在发出的事件从websocket滚动时调度一些动作.

我看到的示例是使用Multiplex而不是实际调用websocket上的订阅,我对改变它感到困惑.

我是这样开始的.但我相信redux observable想要一个

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType(START_BANK_STREAM).mergeMap(action => {
    console.log("in epic mergeMap");
    socket$
      .subscribe(
        e => {
          console.log("dispatch event " + e);
         distributeEvent(e);
        },
        e => {
          logger.log("AmbassadorsDataService", "Unclean socket closure");
        },
        () => {
          logger.log("AmbassadorsDataService", "Socket connection closed");
        }
      )
  });

   function distributeEvent(event: DataEvent) : void {
        //this.logger.log('AmbassadorsDataService', 'Event Received: ' + event.command + ' and id: ' + event.id);
        if(event.source === '/ambassadors/bank') {
            if( event.command === 'REMOVE') {
                removeDataEvent(event);
            }
            else if(event.command == 'ADD') {
                loadDataEvent(event);
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

它抛出一个错误: 未捕获TypeError:您提供了'undefined',其中包含一个流.您可以提供Observable,Promise,Array或Iterable.

任何帮助,将不胜感激!

谢谢

jay*_*lps 11

在redux-observable中,你几乎从不(除非你知道为什么我说"差不多")给subscribe自己打电话.相反,Observables被链接,中间件和其他运营商将为您处理订阅.

如果您只想为收到的每个事件发送一个动作,那很简单:

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType('START_BANK_STREAM')
    .mergeMap(action =>
      socket$
        .map(payload => ({
          type: 'BANK_STREAM_MESSAGE',
          payload
        }))
    );
Run Code Online (Sandbox Code Playgroud)

你可能(也可能不会)需要根据从插座上接收到的消息的内容是什么做的更多的定制,但实际上你可能会得到更好的服务将您的减速器其它逻辑,因为它可能是不相关的副作用.

您可能想要一种方法来停止流,这只是一个takeUntil:

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType('START_BANK_STREAM')
    .mergeMap(action =>
      socket$
        .map(payload => ({
          type: 'BANK_STREAM_MESSAGE',
          payload
        }))
        .takeUntil(
          action$.ofType('STOP_BANK_STREAM')
        )
    );
Run Code Online (Sandbox Code Playgroud)

mergeMap之所以使用,是因为你做了,但在这种情况下我认为switchMap更合适,因为每个都有多个这些似乎是多余的,除非你需要有多个,你的问题只是省略了每个人的独特之处.