在pymongo快速或批量Upsert

Com*_*nce 33 python upsert mongodb nosql pymongo

如何在pymongo中进行批量upsert?我想更新一堆条目,一次做一个是非常慢的.

几乎完全相同的问题的答案在这里:MongoDB中的批量更新/ upsert?

接受的答案实际上没有回答这个问题.它只是提供了一个指向mongo CLI的链接,用于执行导入/导出.

我也愿意向某人解释为什么做大量upsert是不可能/没有最佳做法,但请解释这类问题的首选解决方案是什么.

Kev*_*ice 30

MongoDB 2.6+支持批量操作.这包括批量插入,插入,更新等.这样做的目的是减少/消除执行逐个记录操作的往返延迟("逐个文档"是正确的)的延迟.

那么,这是如何工作的?Python中的示例,因为这就是我的工作.

>>> import pymongo
>>> pymongo.version
'2.7rc0'
Run Code Online (Sandbox Code Playgroud)

要使用此功能,我们创建一个"批量"对象,向其中添加文档,然后在其上调用execute,它将立即发送所有更新.注意事项:收集的操作的BSONsize(bsonsizes的总和)不能超过16 MB的文档大小限制.当然,操作次数因此可能会有很大差异,您的里程可能会有所不同.

Bulk upsert操作的Pymongo示例:

import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3}})
retval = bulkop.execute()
Run Code Online (Sandbox Code Playgroud)

这是必不可少的方法.更多信息,请访问:

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

编辑: - 从3.5版本的python驱动程序,不推荐使用initialize_ordered_bulk_op.请改用bulk_write().[ http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write ]

  • `initialize_ordered_bulk_op()` 语法现已弃用:http://api.mongodb.com/python/current/changelog.html?highlight=initialize_ordered_bulk_op (2认同)

Nei*_*unn 29

现代版本的pymongo(大于3.x)将批量操作包装在一致的界面中,降级服务器版本不支持批量操作的地方.现在这在MongoDB官方支持的驱动程序中是一致的.

因此,编码的首选方法是使用,bulk_write()而是使用UpdateOne其他适当的操作操作.现在,当然最好使用自然语言列表而不是特定的构建器

旧文件的直接翻译:

from pymongo import UpdateOne

operations = [
    UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]

result = collection.bulk_write(operations)
Run Code Online (Sandbox Code Playgroud)

或者经典的文档转换循环:

import random
from pymongo import UpdateOne

random.seed()

operations = []

for doc in collection.find():
    # Set a random number on every document update
    operations.append(
        UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
    )

    # Send once every 1000 in batch
    if ( len(operations) == 1000 ):
        collection.bulk_write(operations,ordered=False)
        operations = []

if ( len(operations) > 0 ):
    collection.bulk_write(operations,ordered=False)
Run Code Online (Sandbox Code Playgroud)

返回的结果BulkWriteResult将包含匹配和更新文档的计数器以及_id发生的任何"upserts" 的返回值.

关于批量操作数组的大小存在一些误解.发送到服务器的实际请求不能超过16MB BSON限制,因为该限制也适用于发送到使用BSON格式的服务器的"请求".

但是,这不会影响您可以构建的请求数组的大小,因为实际操作只会以1000的批量发送和处理.唯一真正的限制是这1000条操作指令本身实际上并不创建大于16MB的BSON文档.这确实是一个非常高的订单.

批量方法的一般概念是"较少流量",这是因为一次发送许多内容并且只处理一个服务器响应.减少每个更新请求附带的开销可以节省大量时间.

  • @Budulianin因为它正在迭代一个游标而且非常反复模式将整个集合加载到内存中.这基本上就是为什么它们以合理的尺寸进行批量处理.此外,这需要重新审视,特别是考虑到异步环境(基本技术保持不变),您基本上可以通过线路发送数据并在等待确认时构建另一批.如果你认为通过制造"一大批"而不是"几个合理的批次"来获得某些东西,那么在大多数情况下我会说这不太可能 (3认同)
  • 对不起,但我不明白,如果您知道您的驱动程序是自己完成的,为什么您要在循环中划分批次。我的意思是在你的例子中, `UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })` 的 1000 个文档较少超过 16 Mb,很明显,pymongo 可以分割你的数据并且请求将会成功。为什么要亲手分割数据? (2认同)

小智 6

如果你有很多数据,并且你想用“_id”来判断数据是否存在,

你可以试试...

import pymongo
from pymongo import UpdateOne
client = pymongo.MongoClient('localhost', 27017)
db=client['sampleDB']

collectionInfo = db.sample

#sample data
datas=[
    {"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33},
    {"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33},
    {"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33},
    {"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33},
    {"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33},
    {"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33}
]

#you should split judge item and other data 
ids=[data.pop("_id") for data in datas]

operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)]

collectionInfo.bulk_write(operations)
Run Code Online (Sandbox Code Playgroud)

我的英语很差,如果你听不懂我说的话,对不起


小智 5

答案仍然是相同的:不支持批量upserts.

  • 这已不再是这种情况.见凯文的回答. (6认同)
  • 这很难过. (2认同)

Ber*_*ett 2

您可以使用 multi=True 更新与查询规范匹配的所有文档。

这里有一个关于按照您想要的方式执行一批命令的错误。