如何在python 2.7中使用pymongo多处理池

Ron*_*sky 2 python multiprocessing mongodb pymongo python-requests

我正在使用 Pymongo 和多处理池来运行 10 个进程并从 API 获取数据并将输出插入到 mongodb 中。

我想我写代码的方式做错了,因为 python 显示打开的双连接比通常情况下;例如:如果我运行 10 个进程 Mongodb 将输出 20 个或更多已建立的连接,并且我将在启动时收到以下警告:

UserWarning: MongoClient 在 fork 之前打开。用connect=False创建MongoClient,或者fork后创建客户端。有关详细信息,请参阅 PyMongo 的文档:http ://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing >

即使我在 mongodb 的连接器客户端输入 connect=False 。这是一个示例代码,用于了解我如何使用 pymongo 和请求 API 在池中发送请求:

# -*- coding: utf-8 -*-
#!/usr/bin/python

import json # to decode and encode json
import requests # web POST and GET requests. 
from pymongo import MongoClient # the mongo driver / connector
from bson import ObjectId # to generate bson object for MongoDB
from multiprocessing import Pool # for the multithreading

# Create the mongoDB Database object, declare collections
client = MongoClient('mongodb://192.168.0.1:27017,192.168.0.2:27017./?replicaSet=rs0', maxPoolSize=20, connect=False)
index = client.database.index
users = client.database.users

def get_user(userid):

    params = {"userid":userid}
    r = requests.get("https://exampleapi.com/getUser",params=params)
    j = json.loads(r.content)
    return j

def process(index_line):

    user = get_user(index_line["userid"])
    if(user):
        users.insert(user)

def main():

    # limit to 100,000 lines of data each loop
    limited = 100
    # skip number of lines for the loop (getting updated)
    skipped = 0
    while True:
        # get cursor with data from index collection
        cursor = index.find({},no_cursor_timeout=True).skip(skipped).limit(limited)
        # prepare the pool with threads
        p = Pool(10)
        # start multiprocessing the pool with the dataset
        p.map(process, cursor)
        # after pool finished, kill it with fire
        p.close()
        p.terminate()
        p.join()
        # after finishing the 100k lines, go for another round, inifnite.
        skipped = skipped + limited
        print "[-] Skipping %s " % skipped

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

我的代码算法有问题吗?有什么方法可以提高效率,更好地工作并更好地控制我的游泳池?

我已经研究了很长时间,但找不到一种方法来以更好的方式做我想做的事情,很想得到一些帮助。

谢谢你。

Olu*_*ule 5

建议MongoClient为每个进程创建一个,不要为每个进程共享同一个客户端。

这是因为MongoClient还使用连接池处理来自进程的多个连接,并且不是 fork-safe

首先,您要确保当要处理的集合中的每个文档都用完时,while 循环会中断。虽然,这并不是一个太细化的条件,但如果skipped大于文档数,您可以中断循环。

其次,Pool在循环外初始化进程并在循环内映射进程。 multiprocessing.Pool.map等待子进程完成并返回,因此加入池将导致异常。multiprocessing.Pool.async_map如果您想异步运行子进程,您可以考虑使用。

您可以使用 a multiprocessing.Queue、生产者和消费者进程以更好的方式明确地实现这一点。生产者进程会将任务添加到队列中以供消费者进程执行。以这种方式实现解决方案的好处并不是很明显,因为多处理库也使用了队列。

import requests # web POST and GET requests. 
from pymongo import MongoClient # the mongo driver / connector
from bson import ObjectId # to generate bson object for MongoDB
from multiprocessing import Pool # for the multithreading


def get_user(userid):
    params = {"userid": userid}
    rv = requests.get("https://exampleapi.com/getUser", params=params)
    json = rv.json()
    return json['content']


def create_connect():
    return MongoClient(
       'mongodb://192.168.0.1:27017,192.168.0.2:27017/?replicaSet=rs0', maxPoolSize=20
    )

def consumer(index_line):
    client = create_connect()
    users = client.database.users

    user = get_user(index_line["_id"])
    if user:
        users.insert(user)

def main():

    # limit to 100,000 lines of data each loop
    limited = 100
    # skip number of lines for the loop (getting updated)
    skipped = 0
    client = create_connect()
    index = client.database.index
    pool = Pool(10)

    count = index.count()

    while True:

        if skipped > count:
            break

        cursor = index.find({}).skip(skipped).limit(limited)

        pool.map(consumer, cursor)

        skipped = skipped + limited
        print("[-] Skipping {}".format(skipped))

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)