mik*_*ikl 11 javascript ecmascript-6 redux redux-saga
我正在尝试使用redux-saga将事件从PouchDB连接到我的React.js应用程序,但我正在努力弄清楚如何将从PouchDB发出的事件连接到我的Saga.由于事件使用了一个回调函数(我无法将其传递给生成器),我无法yield put()在回调中使用,它在ES2015编译后使用Webpack提供了奇怪的错误.
所以这就是我想要完成的事情,那些不起作用的部分就在里面replication.on('change' (info) => {}).
function * startReplication (wrapper) {
while (yield take(DATABASE_SET_CONFIGURATION)) {
yield call(wrapper.connect.bind(wrapper))
// Returns a promise, or false.
let replication = wrapper.replicate()
if (replication) {
replication.on('change', (info) => {
yield put(replicationChange(info))
})
}
}
}
export default [ startReplication ]
Run Code Online (Sandbox Code Playgroud)
Yas*_*afi 21
正如Nirrek所解释的那样,当你需要连接到推送数据源时,你必须为该源构建一个事件迭代器.
我想补充一点,上述机制可以重复使用.因此,我们不必为每个不同的源重新创建事件迭代器.
解决方案是使用和方法创建通用通道.您可以从Generator内部调用该方法,并将该方法连接到数据源的侦听器接口.puttaketakeput
这是一个可能的实现.请注意,如果没有人在等待它们,通道会缓冲消息(例如,Generator正在忙于进行某些远程调用)
function createChannel () {
const messageQueue = []
const resolveQueue = []
function put (msg) {
// anyone waiting for a message ?
if (resolveQueue.length) {
// deliver the message to the oldest one waiting (First In First Out)
const nextResolve = resolveQueue.shift()
nextResolve(msg)
} else {
// no one is waiting ? queue the event
messageQueue.push(msg)
}
}
// returns a Promise resolved with the next message
function take () {
// do we have queued messages ?
if (messageQueue.length) {
// deliver the oldest queued message
return Promise.resolve(messageQueue.shift())
} else {
// no queued messages ? queue the taker until a message arrives
return new Promise((resolve) => resolveQueue.push(resolve))
}
}
return {
take,
put
}
}
Run Code Online (Sandbox Code Playgroud)
然后,只要您想要收听外部推送数据源,就可以使用上述通道.以你为榜样
function createChangeChannel (replication) {
const channel = createChannel()
// every change event will call put on the channel
replication.on('change', channel.put)
return channel
}
function * startReplication (getState) {
// Wait for the configuration to be set. This can happen multiple
// times during the life cycle, for example when the user wants to
// switch database/workspace.
while (yield take(DATABASE_SET_CONFIGURATION)) {
let state = getState()
let wrapper = state.database.wrapper
// Wait for a connection to work.
yield apply(wrapper, wrapper.connect)
// Trigger replication, and keep the promise.
let replication = wrapper.replicate()
if (replication) {
yield call(monitorChangeEvents, createChangeChannel(replication))
}
}
}
function * monitorChangeEvents (channel) {
while (true) {
const info = yield call(channel.take) // Blocks until the promise resolves
yield put(databaseActions.replicationChange(info))
}
}
Run Code Online (Sandbox Code Playgroud)
我们可以使用eventChannelredux-saga
这是我的例子
// fetch history messages
function* watchMessageEventChannel(client) {
const chan = eventChannel(emitter => {
client.on('message', (message) => emitter(message));
return () => {
client.close().then(() => console.log('logout'));
};
});
while (true) {
const message = yield take(chan);
yield put(receiveMessage(message));
}
}
function* fetchMessageHistory(action) {
const client = yield realtime.createIMClient('demo_uuid');
// listen message event
yield fork(watchMessageEventChannel, client);
}
Run Code Online (Sandbox Code Playgroud)
请注意:
默认情况下,eventChannel上的消息不会被缓冲.如果您只想message event逐个处理,则不能使用阻塞调用const message = yield take(chan);
或者您必须为eventChannel工厂提供缓冲区,以指定通道的缓冲策略(例如eventChannel(订户,缓冲区)).有关详细信息,请参阅redux-saga API文档
小智 6
我们必须解决的根本问题是事件发射器是"基于推进的",而传说是"基于拉动的".
如果你订阅像这样的事件:replication.on('change', (info) => {}),则回调执行每当replication事件发射决定推一个新值.
对于传奇,我们需要翻转控制.当它决定何时响应可用的新变更信息时,必须控制它的传奇.换句话说,一个传奇需要提取新信息.
以下是实现此目的的一种方法示例:
function* startReplication(wrapper) {
while (yield take(DATABASE_SET_CONFIGURATION)) {
yield apply(wrapper, wrapper.connect);
let replication = wrapper.replicate()
if (replication)
yield call(monitorChangeEvents, replication);
}
}
function* monitorChangeEvents(replication) {
const stream = createReadableStreamOfChanges(replication);
while (true) {
const info = yield stream.read(); // Blocks until the promise resolves
yield put(replicationChange(info));
}
}
// Returns a stream object that has read() method we can use to read new info.
// The read() method returns a Promise that will be resolved when info from a
// change event becomes available. This is what allows us to shift from working
// with a 'push-based' model to a 'pull-based' model.
function createReadableStreamOfChanges(replication) {
let deferred;
replication.on('change', info => {
if (!deferred) return;
deferred.resolve(info);
deferred = null;
});
return {
read() {
if (deferred)
return deferred.promise;
deferred = {};
deferred.promise = new Promise(resolve => deferred.resolve = resolve);
return deferred.promise;
}
};
}
Run Code Online (Sandbox Code Playgroud)
还有就是上面的例子中的JSbin这里:http://jsbin.com/cujudes/edit?js,console
您还应该看看Yassine Elouafi对类似问题的回答: 我可以使用redux-saga的es6生成器作为websockets或eventsource的onmessage监听器吗?
| 归档时间: |
|
| 查看次数: |
8122 次 |
| 最近记录: |