pymongo MongoClient无法在多进程中工作?

rog*_*ger 6 python multiprocessing mongodb pymongo

我正在使用pymongo 3.2,我想在multiporcess中使用它:

client = MongoClient(JD_SEARCH_MONGO_URI, connect=False)
db = client.jd_search

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    for jd in db['sample_data'].find():
        jdId = jd["jdId"]
        for cv in db["sample_data"].find():
            itemId = cv["itemId"]
            executor.submit(intersect_compute, jdId, itemId)
            #  print "done {} => {}".format(jdId, itemId)
Run Code Online (Sandbox Code Playgroud)

但我得到错误:

UserWarning: MongoClient opened before fork. Create MongoClient with connect=False, or create client after forking. See PyMongo's documentation for details: http://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing>
Run Code Online (Sandbox Code Playgroud)

根据该文件,我已经设置connectFalse,你可以看到

wow*_*in2 3

您所做的与文档中的操作完全相同(URL 除外),但在 部分中Never do this
ps 我在评论末尾更新了您的代码示例。

在每个进程内创建与数据库的连接:

# Each process creates its own instance of MongoClient.
def func():
    db = pymongo.MongoClient().mydb
    # Do something with db.

proc = multiprocessing.Process(target=func)
proc.start()
Run Code Online (Sandbox Code Playgroud)

切勿这样做:

client = pymongo.MongoClient()

# Each child process attempts to copy a global MongoClient
# created in the parent process. Never do this.
def func():
  db = client.mydb
  # Do something with db.

proc = multiprocessing.Process(target=func)
proc.start()
Run Code Online (Sandbox Code Playgroud)

您需要更改的是将数据库连接初始化移至每个进程的分支。因为他们每个人都有自己独立的联系。

您的样本已更新:

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    client = MongoClient(JD_SEARCH_MONGO_URI, connect=False)
    db = client.jd_search

    for jd in db['sample_data'].find():
        jdId = jd["jdId"]
        for cv in db["sample_data"].find():
            itemId = cv["itemId"]
            executor.submit(intersect_compute, jdId, itemId)
            #  print "done {} => {}".format(jdId, itemId)
Run Code Online (Sandbox Code Playgroud)