使用 pymongo 客户端从 MongoDB 向 Pandas 读取数据时出现 OOM

MPA*_*MPA 2 python pymongo pandas

我在 mongo 集合上有(900k, 300)条记录。当我尝试将数据读取到 Pandas 时,内存消耗会急剧增加,直到进程被终止。1.5GB~如果我从 csv 文件中读取数据,我必须提到数据适合内存()。

我的机器是 32GB RAM 和 16 个 CPU 的 Centos 7。

我的简单代码:

client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = pd.DataFrame(list(cursor))
Run Code Online (Sandbox Code Playgroud)

我的多处理代码:

def read_mongo_parallel(skipses):


    print("Starting process")
    client = MongoClient(skipses[4],skipses[5])
    db = client[skipses[2]]
    collection = db[skipses[3]]
    print("range of {} to {}".format(skipses[0],skipses[0]+skipses[1]))

    cursor = collection.find().skip(skipses[0]).limit(skipses[1])

    return list(cursor)

all_lists = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for  rows in  executor.map(read_mongo_parallel, skipesess):
            all_lists.extend(rows)


df = pd.DataFrame(all_lists)   
Run Code Online (Sandbox Code Playgroud)

两种方法的内存增加并杀死内核,

我在做什么?

Car*_*ala 8

问题在于list构建DataFrame. 游标一下子被消耗掉,生成了一个包含 90 万个字典的列表,占用了大量内存。

如果您创建一个空DataFrame文件然后分批拉取文档,一次几个文档,并将它们附加到DataFrame.

def batched(cursor, batch_size):
    batch = []
    for doc in cursor:
        batch.append(doc)
        if batch and not len(batch) % batch_size:
            yield batch
            batch = []

    if batch:   # last documents
        yield batch

df = pd.DataFrame()
for batch in batched(cursor, 10000):
    df = df.append(batch, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)

10000 似乎是一个合理的批处理大小,但您可能希望根据内存限制对其进行更改:它越高,结束速度越快,但运行时使用的内存也越多。

更新:添加一些基准

请注意,这种方法不一定会使查询持续更长时间,而是相反,因为实际上需要时间的是从 mongodb 中提取文档作为字典并将它们分配到列表中的过程。

以下是一些包含 300K 文档的基准测试,它们展示了这种方法如何使用右侧batch_size实际上比将整个光标拉入列表更快:

  • 整个游标变成一个列表
%%time

df = pd.DataFrame(list(db.test.find().limit(300000)))
Run Code Online (Sandbox Code Playgroud)

CPU 时间:用户 35.3 秒,系统:2.14 秒,总计:37.5 秒挂墙时间:37.7 秒

  • batch_size=10000<-最快
%%time

df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 10000):
    df = df.append(batch, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)

CPU 时间:用户 29.5 秒,系统:1.23 秒,总计:30.7 秒挂墙时间:30.8 秒

  • batch_size=1000
%%time

df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 1000):
    df = df.append(batch, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)

CPU 时间:用户 44.8 秒,系统:2.09 秒,总计:46.9 秒挂墙时间:46.9 秒

  • batch_size=100000
%%time

df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 100000):
    df = df.append(batch, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)

CPU 时间:用户 34.6 秒,系统:1.15 秒,总计:35.8 秒挂墙时间:36 秒