如何监听MongoDB集合的变化?

And*_*rew 192 mongodb

我正在使用MongoDB作为数据存储创建一种后台作业队列系统.在产生工作人员处理作业之前,如何"监听"对MongoDB集合的插入?我是否需要每隔几秒轮询以查看上次是否有任何更改,或者我的脚本是否有可能等待插入发生?这是我正在研究的PHP项目,但可以随意回答Ruby或语言无关.

Gat*_* VP 109

你在想什么听起来很像触发器.MongoDB对触发器没有任何支持,但有些人使用一些技巧"自己动手".这里的关键是oplog.

在副本集中运行MongoDB时,所有MongoDB操作都会记录到操作日志(称为oplog).oplog基本上只是对数据所做修改的运行列表.副本通过侦听此oplog上的更改然后在本地应用更改来设置功能.

这听起来很熟悉吗?

我不能在这里详细说明整个过程,它是几页文档,但您可以使用所需的工具.

首先在oplog上写一些文章 - 简要说明 - 集合的布局local(包含oplog)

您还需要利用tailable游标.这些将为您提供一种监听更改而不是轮询它们的方法.请注意,复制使用tailable游标,因此这是一项受支持的功能.

  • 您可以使用`--replSet`选项启动服务器,它将创建/填充`oplog`.即使没有中学.这绝对是"监听"DB中更改的唯一方法. (15认同)
  • 这是一个很好的描述如何设置oplog以在本地记录对DB的更改:http://loosexaml.wordpress.com/2012/09/03/how-to-get-a-mongodb-oplog-without-a-full-副本集/ (2认同)

And*_*rew 99

MongoDB的具有所谓capped collectionstailable cursors允许MongoDB的将数据推给听众.

A capped collection本质上是一个固定大小的集合,只允许插入.以下是创建一个的样子:

db.createCollection("messages", { capped: true, size: 100000000 })
Run Code Online (Sandbox Code Playgroud)

MongoDB Tailable游标(Jonathan H. Wage的原始帖子)

红宝石

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end
Run Code Online (Sandbox Code Playgroud)

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}
Run Code Online (Sandbox Code Playgroud)

Python(罗伯特斯图尔特)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

Perl(由Max)

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}
Run Code Online (Sandbox Code Playgroud)

其他资源:

Ruby/Node.js教程,它将引导您创建一个监听MongoDB上限集合中的插入的应用程序.

一篇文章更详细地讨论了tailable游标.

PHP,Ruby,Python和Perl使用tailable游标的示例.

  • 睡1?真?对于生产代码?怎么不投票? (67认同)
  • 在生产中做time.sleep(1)有什么问题? (17认同)
  • @kroe因为那些不相关的细节会被新的程序员放入生产代码中,而这些程序员可能不理解为什么会这么糟糕. (14认同)
  • 我理解你的观点,但期待一些新的程序员将"睡眠1"添加到制作中几乎是令人反感的!我的意思是,我不会感到惊讶......但如果有人把它投入生产,至少会学到很多方法并永远学习......哈哈哈 (3认同)
  • @rbp哈哈,我从来没有说过这是生产代码,但你是对的,睡一会儿不是一个好习惯.很确定我从其他地方得到了这个例子.不知道如何重构它. (2认同)
  • 大声笑,他只是显示可用的游标!他做了自己的工作,为什么懒得睡觉1 !! 到目前为止这篇文章中最无关紧要的事情!! 是一个很好的答案! (2认同)
  • 对于ruby,使用new(2.3)驱动程序:`collection.find({query},{tailable:true,await_data:true}).each {| document | ...}`同时查看http://shtylman.com/post/the-tail-of-mongodb/,在那里你可以了解到你需要光标中的初始数据来开始拖尾 (2认同)

Mit*_*tar 41

从MongoDB 3.6开始,将会有一个名为Change Streams的新通知API,您可以使用它.有关示例,请参阅此博客文章.它的例子:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])
Run Code Online (Sandbox Code Playgroud)

  • 为什么?你能详细说说吗?这是现在的标准方式吗? (4认同)
  • 如何杀死服务器上的性能. (3认同)
  • 你在哪里看到民意调查? (3认同)

Rio*_*ber 35

看看这个:改变流

2018年1月10日 - 版本3.6

*编辑:我写了一篇关于如何做到这一点的文章https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


它是mongodb中 的新功能3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2
Run Code Online (Sandbox Code Playgroud)

为了使用changeStreams,数据库必须是一个复制集

有关复制集的更多信息:https: //docs.mongodb.com/manual/replication/

默认情况下,您的数据库将是" 独立 ".

如何将独立转换为副本集:https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


以下示例是您如何使用它的实际应用程序.
*专门针对Node.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */
Run Code Online (Sandbox Code Playgroud)

有用的链接:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams


Rob*_*ers 20

MongoDB版本3.6现在包括更改流,它本质上是OpLog之上的API,允许类似触发/通知的用例.

以下是Java示例的链接:http: //mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

NodeJS示例可能类似于:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });
Run Code Online (Sandbox Code Playgroud)