Muh*_*ras 2 mongodb node.js changestream
我在 nodejs 中使用 mongoDB 更改流,一切正常,但如果数据库关闭需要超过 10 5 秒才能启动更改流会引发超时错误,这是我的更改流观察器代码
Service.prototype.watcher = function( db ){
let collection = db.collection('tokens');
let changeStream = collection.watch({ fullDocument: 'updateLookup' });
let resumeToken, newChangeStream;
changeStream.on('change', next => {
resumeToken = next._id;
console.log('data is ', JSON.stringify(next))
changeStream.close();
// console.log('resumeToken is ', JSON.stringify(resumeToken))
newChangeStream = collection.watch({ resumeAfter : resumeToken });
newChangeStream.on('change', next => {
console.log('insert called ', JSON.stringify( next ))
});
});
Run Code Online (Sandbox Code Playgroud)
但是在数据库端我已经处理了它,即如果数据库关闭或使用此代码重新连接
this.db.on('reconnected', function () {
console.info('MongoDB reconnected!');
});
this.db.on('disconnected', function() {
console.warn('MongoDB disconnected!');
});
Run Code Online (Sandbox Code Playgroud)
但我无法处理更改流观察程序以在数据库关闭时停止它并在重新连接数据库时重新启动它,或者是否有其他更好的方法来做到这一点?
您要做的是将watch()调用封装在一个函数中。然后,此函数将在出错时调用自身,以使用先前保存的恢复令牌重新查看集合。您拥有的代码中缺少的是错误处理程序。例如:
const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
var resume_token = null
run()
function watch_collection(con, db, coll) {
console.log(new Date() + ' watching: ' + coll)
con.db(db).collection(coll).watch({resumeAfter: resume_token})
.on('change', data => {
console.log(data)
resume_token = data._id
})
.on('error', err => {
console.log(new Date() + ' error: ' + err)
watch_collection(con, coll)
})
}
async function run() {
con = await MongoClient.connect(uri, {"useNewUrlParser": true})
watch_collection(con, 'test', 'test')
}
Run Code Online (Sandbox Code Playgroud)
请注意,它watch_collection()包含该watch()方法及其处理程序。更改时,它将打印更改并存储恢复令牌。出错时,它会调用自己再次重新观看集合。
| 归档时间: |
|
| 查看次数: |
2728 次 |
| 最近记录: |