PyMongo:如何在 MongoDB 中批量更新巨大的 JSON 数据

Chr*_*s A 5 python json mongodb pymongo

我从 API 中提取 JSON 数据,输出如下:

[[{'employeeId': 1, 'lastName': 'Smith'}, {'employeeId': 2, 'lastName': 'Flores'}]]
Run Code Online (Sandbox Code Playgroud)

列表中大约有250k 个对象。我能够遍历列表中的对象并以这种方式update_one通过PyMongo执行:

json_this = json.dumps(json_list[0])
json_that = json.loads(json_this)
for x in json_that:
    collection.update_one({"employeeId": x['employeeId']},{"$set": x},upsert=True)
Run Code Online (Sandbox Code Playgroud)

但是对于25 万条记录,这需要很长时间。我正在尝试使用update_many但无法弄清楚如何正确转换/格式化此 JSON 列表以使用该update_many函数。任何指导将不胜感激。

sri*_*asy 3

将250K文档更新/插入 到数据库可能是一项繁重的任务,您不能使用update_many过滤器查询,并且更新值在每个字典之间会发生变化。因此,通过下面的查询,您至少可以避免对数据库的多次调用,但我不太确定这对于您的场景来说效果如何,请注意,我是 python 的初学者,这是一个基本代码,可以为您提供一个想法:

对于批量操作,您可以做的最好的事情是PyMongo-bulk,由于.bulkWrite()的限制,我们将250K记录分割成块:

from pymongo import UpdateOne
from pprint import pprint
import sys

json_this = json.dumps(json_list[0])
json_that = json.loads(json_this)

primaryBulkArr = []
secondaryBulkArr = []
thirdBulkArr = []

## Here we're splicing 250K records into 3 arrays, in case if we want to finish a chunk at a time,
 # No need to splice all at once - Finish end - to - end for one chunk & restart the process for another chunk from the index of the list where you left previously

for index, x in enumerate(json_that):
    if index < 90000:
        primaryBulkArr.append(
            UpdateOne({"employeeId": x['employeeId']}, {'$set': x}, upsert=True))
    elif index > 90000 and index < 180000:
        secondaryBulkArr.append(
            UpdateOne({"employeeId": x['employeeId']}, {'$set': x}, upsert=True))
    else:
        thirdBulkArr.append(
            UpdateOne({"employeeId": x['employeeId']}, {'$set': x}, upsert=True))

## Reason why I've spliced into 3 arrays is may be you can run below code in parallel if your DB & application servers can take it,
# At the end of the day irrespective of time taken only 3 DB calls are needed & this bulk op is much efficient.
try:
    result = collection.bulk_write(bulkArr)
    ## result = db.test.bulk_write(bulkArr, ordered=False)
    # Opt for above if you want to proceed on all dictionaries to be updated, even though an error occured in between for one dict
    pprint(result.bulk_api_result)
except:
    e = sys.exc_info()[0]
    print("An exception occurred ::", e) ## Get the ids failed if any & do re-try
Run Code Online (Sandbox Code Playgroud)