Lin*_*nda 3 mongodb node.js mongodb-query
每次将特定类型的数据插入到集合中时,我想使用 MongoDB 流来触发事件。
我已经找到了与我正在寻找的大致相似的东西,但这仅适用于更改流而不适用于插入。
知道我如何完成这项工作吗?
我正在使用带有Nodejs 的Mongodb驱动程序来完成这项工作,因此我的代码将是这样的:
const MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
MongoClient.connect(uri, function(err, client) {
const db = client.db('mydb');
// Connect using MongoClient
var filter = [{
$match: {
$or: [
{ $or: [{"receipt.receiver": "newdexpocket"}, {"act.account": "newdexpocket"}] }]
}
}];
var options = { fullDocument: 'updateLookup' };
db.collection('somecollection').watch(filter, options).on('create', data =>
{
console.log(data);
});
});
Run Code Online (Sandbox Code Playgroud)
operationType在过滤器中指定一个吗?updateLookup不是正确的工具,我应该使用什么?on活动中使用哪些选项?我用过,create但我什至不确定它是否存在,是吗?对所有这些问题感到抱歉,但我正在努力在官方文档中找到一些答案。
解决方案:
当心不要忘记fullDocument您的请求;-)
function watch_insert(con, db, coll) {
console.log(new Date() + ' watching: ' + coll);
const insert_pipeline = [ { $match:
{
operationType: 'insert',
$or: [
{ "fullDocument.receipt.receiver": "newdexpocket" },
{ "fullDocument.act.account": "newdexpocket" }
]
}
}];
con.db(db).collection(coll).watch(insert_pipeline)
.on('change', data => {
console.log(data)
});
}
async function run(uri) {
try {
con = await MongoClient.connect(uri, {"useNewUrlParser": true});
watch_insert(con, 'EOS', 'action_traces');
} catch (err) {
console.log(err);
}
}
Run Code Online (Sandbox Code Playgroud)
您需要:
operationType: 'insert'。由于您不想监视更新,因此不需要updateLookup.operationType.watch()。示例输出位于“更改事件”页面中。watch()返回一个ChangeStream. 它激发close,change,end,和error事件。有关更多详细信息,请参阅ChangeStream。
这是一个完整的变更流示例,它监听对insert数据库test集合的操作test。它将输出具有字段{a: 1}( 'fullDocument.a': 1) 的文档,并且将忽略更新、插入 的其他值a或任何没有字段的内容a。
const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
const insert_pipeline = [
{$match: {operationType: 'insert', 'fullDocument.a': 1}}
]
function watch_insert(con, db, coll) {
console.log(new Date() + ' watching: ' + coll)
con.db(db).collection(coll).watch(insert_pipeline)
.on('change', data => {
console.log(data)
})
}
async function run() {
con = await MongoClient.connect(uri, {"useNewUrlParser": true})
watch_insert(con, 'test', 'test')
}
run()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1839 次 |
| 最近记录: |