使用 MongoDB Streams 观看文档创建

Lin*_*nda 3 mongodb node.js mongodb-query

每次将特定类型的数据插入到集合中时,我想使用 MongoDB 流来触发事件。

我已经找到了与我正在寻找的大致相似的东西,但这仅适用于更改流而不适用于插入。

知道我如何完成这项工作吗?

我正在使用带有Nodejs 的Mongodb驱动程序来完成这项工作,因此我的代码将是这样的:

const MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
MongoClient.connect(uri, function(err, client) {

const db = client.db('mydb');

// Connect using MongoClient
var filter = [{
    $match: {
        $or: [
            { $or: [{"receipt.receiver": "newdexpocket"}, {"act.account": "newdexpocket"}] }]
    }
}];

var options = { fullDocument: 'updateLookup' };
db.collection('somecollection').watch(filter, options).on('create', data => 
  {
    console.log(data);
  });
});
Run Code Online (Sandbox Code Playgroud)
  • 我需要operationType在过滤器中指定一个吗?
  • 我还需要获取 fullDocument 但显然这updateLookup不是正确的工具,我应该使用什么?
  • 我可以在on活动中使用哪些选项?我用过,create但我什至不确定它是否存在,是吗?

对所有这些问题感到抱歉,但我正在努力在官方文档中找到一些答案。

解决方案:

当心不要忘记fullDocument您的请求;-)

function watch_insert(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll);

  const insert_pipeline = [ { $match:
                    { 
                        operationType: 'insert',
                        $or: [
                            { "fullDocument.receipt.receiver": "newdexpocket" },
                            { "fullDocument.act.account": "newdexpocket" }
                        ]
                    }
                }];

  con.db(db).collection(coll).watch(insert_pipeline)
    .on('change', data => {
      console.log(data)
    });
}

async function run(uri) {
    try {
        con = await MongoClient.connect(uri, {"useNewUrlParser": true});
        watch_insert(con, 'EOS', 'action_traces');
    } catch (err) {
        console.log(err);
    }
}
Run Code Online (Sandbox Code Playgroud)

kev*_*adi 5

您需要:

  1. 指定operationType: 'insert'。由于您不想监视更新,因此不需要updateLookup.
  2. 为您的过滤器创建一个适当的聚合管道,其中包括operationType.
  3. 聚合管道过滤由 返回的文档watch()。示例输出位于“更改事件”页面中

watch()返回一个ChangeStream. 它激发closechangeend,和error事件。有关更多详细信息,请参阅ChangeStream

这是一个完整的变更流示例,它监听对insert数据库test集合的操作test。它将输出具有字段{a: 1}( 'fullDocument.a': 1) 的文档,并且将忽略更新、插入 的其他值a或任何没有字段的内容a

const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'

const insert_pipeline = [
  {$match: {operationType: 'insert', 'fullDocument.a': 1}}
]

function watch_insert(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll)
  con.db(db).collection(coll).watch(insert_pipeline)
    .on('change', data => {
      console.log(data)
    })
}

async function run() {
  con = await MongoClient.connect(uri, {"useNewUrlParser": true})
  watch_insert(con, 'test', 'test')
}

run()
Run Code Online (Sandbox Code Playgroud)