我如何实时监听 MongoDB 上的更改?

Jac*_*022 3 python mongodb

我有一个数据库,我正在其中发送一些数据。同时,我正在运行一个 Python 脚本,我希望该脚本在添加后立即在我的控制台上将最后一个条目发送到 MongoDB 数据库。

我几天来一直在寻找解决方案,但没有找到任何东西。

我做了一些研究并发现:a)可尾游标,但唯一的问题是我的数据库没有上限,并且由于我将每 5 秒放入一次数据,我担心上限数据库不足以满足我的需求因为当达到最大大小时,它将开始覆盖最旧的记录b)change_streams,但我的数据库不是副本集,我对此相当陌生,所以我仍然需要了解更高级的主题,例如RS。

有什么建议吗?

这是我到目前为止得到的:

from pymongo import MongoClient
import pymongo
import time
import random
from pprint import pprint

#Step 1: Connect to MongoDB - Note: Change connection string as needed
client = MongoClient(port=27017)


db = client.one

mycol = client["coll"]


highest_previous_primary_key = 1

while True:
    cursor = db.mycol.find()
    for document in cursor:

        # get the current primary key, and if it's greater than the previous one
        # we print the results and increment the variable to that value
        current_primary_key = document['num']
        if current_primary_key > highest_previous_primary_key:
            print(document['num'])
            highest_previous_primary_key = current_primary_key
Run Code Online (Sandbox Code Playgroud)

但问题是它会在第四条记录后停止打印,而且我不知道当我的数据库有大量数据时这是否是最佳解决方案。

有什么建议吗?

Wan*_*iar 5

b)change_streams,但我的数据库不是副本集,我对此相当陌生,所以我仍然需要了解更高级的主题,例如 RS

副本集提供冗余和高可用性,是所有生产 MongoDB 部署的基础。话虽如此,为了测试和/或部署,您可以部署仅包含单个成员的副本集。对于本地测试示例:

mongod --dbpath /path/data/test --replSet test
Run Code Online (Sandbox Code Playgroud)

本地测试服务器启动后,连接mongo shell 执行rs.initiate()

mongo
> rs.initiate()
Run Code Online (Sandbox Code Playgroud)

请参阅相关部署副本集以进行测试和部署

try:
    # Only catch insert operations
    with client.watch([{'$match': {'operationType': 'insert'}}]) as stream:
        for insert_change in stream:
            print(insert_change)
except pymongo.errors.PyMongoError:
    # The ChangeStream encountered an unrecoverable error or the
    # resume attempt failed to recreate the cursor.
    logging.error('...')
Run Code Online (Sandbox Code Playgroud)

另请参见pymongo.mongo_client.MongoClient.watch()