如何使用insert_many方法处理pymongo AutoReconnect异常

Tom*_*art 10 bulkinsert mongodb pymongo replicaset

我有一个包含3个成员的MongoDB副本集和一个存储数据的Python应用程序.

AutoReconnect当使用带有包装器的单个文档插入时,我可以处理pymongo的异常,如下所示:

def safe_mongo_call(method, num_retries, *args, **kwargs):
    while True:
        try:
            return method(*args, **kwargs)
        except (pymongo.errors.AutoReconnect,
                pymongo.errors.ServerSelectionTimeoutError) as e:
            if num_retries > 0:
                logger.debug('Retrying MongoDB operation: %s', str(e))
                num_retries -= 1
            else:
                raise
Run Code Online (Sandbox Code Playgroud)

我不确定在使用批量写入时如何处理这些异常,例如insert_many方法.根据文档,批量写入不是原子的,因此即使发生其中一个异常,也可能已经有一些文档成功写入数据库.因此,我不能简单地重用上面的包装器方法.

如何处理这些情况的最佳方法是什么?

ffe*_*ast 3

对于这种情况,存在\xe2\x80\x99s BulkWriteError,必须提供有关已完成的\xe2\x80\x99s的详细信息\n https://api.mongodb.com/python/current/examples/bulk.html#ordered-bulk -写操作

\n\n

但如果连接丢失,则会发送AutoReconnect,并且操作进度的信息似乎会丢失(针对 pymongo==3.5.1 进行了测试)

\n\n

在任何情况下,您都需要重建已写入的内容和未写入的内容,然后对其余项目重试该操作。\n在后一种情况下,会有点困难,因为您没有事先的信息关于实际已写但仍然可行的内容

\n\n

作为草图解决方案:\n除非_id已经存在,否则每个要插入的文档都会分配一个ObjectId。您可以自己处理这个问题 - 迭代文档,手动为丢失的文档分配_id并将 ID 保存在临时变量中。一旦遇到异常,就会发现最后一个_id成功插入,利用类似于二分搜索的方法,最多可以进行 ~O(logN) 查询,并且还可以使用批量操作被分成较小批次的事实(https://api. mongodb.com/python/current/examples/bulk.html#bulk-insert)。但当然,这种方法的适用性取决于 mongod 实例上的负载配置文件以及是否允许额外的查询突发。如果按预期抛出BulkWriteError,您可以只抓取未插入的文档,然后仅对这些文档重试操作。

\n\n

回到 自动重新连接问题,我个人会在mongo-python-driver问题跟踪器中开一张票,很可能是一个错误或故意这样做

\n