如何将发布的事件事件绑定到redux-saga?

mik*_*ikl 11 javascript ecmascript-6 redux redux-saga

我正在尝试使用redux-saga将事件从PouchDB连接到我的React.js应用程序,但我正在努力弄清楚如何将从PouchDB发出的事件连接到我的Saga.由于事件使用了一个回调函数(我无法将其传递给生成器),我无法yield put()在回调中使用,它在ES2015编译后使用Webpack提供了奇怪的错误.

所以这就是我想要完成的事情,那些不起作用的部分就在里面replication.on('change' (info) => {}).

function * startReplication (wrapper) {
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    yield call(wrapper.connect.bind(wrapper))

    // Returns a promise, or false.
    let replication = wrapper.replicate()

    if (replication) {
      replication.on('change', (info) => {
        yield put(replicationChange(info))
      })
    }
  }
}

export default [ startReplication ]
Run Code Online (Sandbox Code Playgroud)

Yas*_*afi 21

正如Nirrek所解释的那样,当你需要连接到推送数据源时,你必须为该源构建一个事件迭代器.

我想补充一点,上述机制可以重复使用.因此,我们不必为每个不同的源重新创建事件迭代器.

解决方案是使用和方法创建通用通道.您可以从Generator内部调用该方法,并将该方法连接到数据源的侦听器接口.puttaketakeput

这是一个可能的实现.请注意,如果没有人在等待它们,通道会缓冲消息(例如,Generator正在忙于进行某些远程调用)

function createChannel () {
  const messageQueue = []
  const resolveQueue = []

  function put (msg) {
    // anyone waiting for a message ?
    if (resolveQueue.length) {
      // deliver the message to the oldest one waiting (First In First Out)
      const nextResolve = resolveQueue.shift()
      nextResolve(msg)
    } else {
      // no one is waiting ? queue the event
      messageQueue.push(msg)
    }
  }

  // returns a Promise resolved with the next message
  function take () {
    // do we have queued messages ?
    if (messageQueue.length) {
      // deliver the oldest queued message
      return Promise.resolve(messageQueue.shift())
    } else {
      // no queued messages ? queue the taker until a message arrives
      return new Promise((resolve) => resolveQueue.push(resolve))
    }
  }

  return {
    take,
    put
  }
}
Run Code Online (Sandbox Code Playgroud)

然后,只要您想要收听外部推送数据源,就可以使用上述通道.以你为榜样

function createChangeChannel (replication) {
  const channel = createChannel()

  // every change event will call put on the channel
  replication.on('change', channel.put)
  return channel
}

function * startReplication (getState) {
  // Wait for the configuration to be set. This can happen multiple
  // times during the life cycle, for example when the user wants to
  // switch database/workspace.
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    let state = getState()
    let wrapper = state.database.wrapper

    // Wait for a connection to work.
    yield apply(wrapper, wrapper.connect)

    // Trigger replication, and keep the promise.
    let replication = wrapper.replicate()

    if (replication) {
      yield call(monitorChangeEvents, createChangeChannel(replication))
    }
  }
}

function * monitorChangeEvents (channel) {
  while (true) {
    const info = yield call(channel.take) // Blocks until the promise resolves
    yield put(databaseActions.replicationChange(info))
  }
}
Run Code Online (Sandbox Code Playgroud)

  • @ yassine-elouafi取消回调并将其转变为动作创建者`(info)=>({type:'ON_CHANGE“,info})`的缺点是什么,分别监视'ON_CHANGE'并执行`yield put(replicationChange (信息)在那里? (2认同)

jk2*_*k2K 7

我们可以使用eventChannelredux-saga

这是我的例子

// fetch history messages
function* watchMessageEventChannel(client) {
  const chan = eventChannel(emitter => {
    client.on('message', (message) => emitter(message));
    return () => {
      client.close().then(() => console.log('logout'));
    };
  });
  while (true) {
    const message = yield take(chan);
    yield put(receiveMessage(message));
  }
}

function* fetchMessageHistory(action) {
  const client = yield realtime.createIMClient('demo_uuid');
  // listen message event
  yield fork(watchMessageEventChannel, client);
}
Run Code Online (Sandbox Code Playgroud)

请注意:

默认情况下,eventChannel上的消息不会被缓冲.如果您只想message event逐个处理,则不能使用阻塞调用const message = yield take(chan);

或者您必须为eventChannel工厂提供缓冲区,以指定通道的缓冲策略(例如eventChannel(订户,缓冲区)).有关详细信息,请参阅redux-saga API文档


小智 6

我们必须解决的根本问题是事件发射器是"基于推进的",而传说是"基于拉动的".

如果你订阅像这样的事件:replication.on('change', (info) => {}),则回调执行每当replication事件发射决定一个新值.

对于传奇,我们需要翻转控制.当它决定何时响应可用的新变更信息时,必须控制它的传奇.换句话说,一个传奇需要提取新信息.

以下是实现此目的的一种方法示例:

function* startReplication(wrapper) {
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    yield apply(wrapper, wrapper.connect);
    let replication = wrapper.replicate()
    if (replication)
      yield call(monitorChangeEvents, replication);
  }
}

function* monitorChangeEvents(replication) {
  const stream = createReadableStreamOfChanges(replication);

  while (true) {
    const info = yield stream.read(); // Blocks until the promise resolves
    yield put(replicationChange(info));
  }
}

// Returns a stream object that has read() method we can use to read new info.
// The read() method returns a Promise that will be resolved when info from a
// change event becomes available. This is what allows us to shift from working
// with a 'push-based' model to a 'pull-based' model.
function createReadableStreamOfChanges(replication) {
  let deferred;

  replication.on('change', info => {
    if (!deferred) return;
    deferred.resolve(info);
    deferred = null;
  });

  return {
    read() {
      if (deferred)
        return deferred.promise;

      deferred = {};
      deferred.promise = new Promise(resolve => deferred.resolve = resolve);
      return deferred.promise;
    }
  };
}
Run Code Online (Sandbox Code Playgroud)

还有就是上面的例子中的JSbin这里:http://jsbin.com/cujudes/edit?js,console

您还应该看看Yassine Elouafi对类似问题的回答: 我可以使用redux-saga的es6生成器作为websockets或eventsource的onmessage监听器吗?