MPA*_*MPA 2 python pymongo pandas
我在 mongo 集合上有(900k, 300)条记录。当我尝试将数据读取到 Pandas 时,内存消耗会急剧增加,直到进程被终止。1.5GB~如果我从 csv 文件中读取数据,我必须提到数据适合内存()。
我的机器是 32GB RAM 和 16 个 CPU 的 Centos 7。
我的简单代码:
client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = pd.DataFrame(list(cursor))
Run Code Online (Sandbox Code Playgroud)
我的多处理代码:
def read_mongo_parallel(skipses):
print("Starting process")
client = MongoClient(skipses[4],skipses[5])
db = client[skipses[2]]
collection = db[skipses[3]]
print("range of {} to {}".format(skipses[0],skipses[0]+skipses[1]))
cursor = collection.find().skip(skipses[0]).limit(skipses[1])
return list(cursor)
all_lists = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for rows in executor.map(read_mongo_parallel, skipesess):
all_lists.extend(rows)
df = pd.DataFrame(all_lists)
Run Code Online (Sandbox Code Playgroud)
两种方法的内存增加并杀死内核,
我在做什么?
问题在于list构建DataFrame. 游标一下子被消耗掉,生成了一个包含 90 万个字典的列表,占用了大量内存。
如果您创建一个空DataFrame文件然后分批拉取文档,一次几个文档,并将它们附加到DataFrame.
def batched(cursor, batch_size):
batch = []
for doc in cursor:
batch.append(doc)
if batch and not len(batch) % batch_size:
yield batch
batch = []
if batch: # last documents
yield batch
df = pd.DataFrame()
for batch in batched(cursor, 10000):
df = df.append(batch, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)
10000 似乎是一个合理的批处理大小,但您可能希望根据内存限制对其进行更改:它越高,结束速度越快,但运行时使用的内存也越多。
更新:添加一些基准
请注意,这种方法不一定会使查询持续更长时间,而是相反,因为实际上需要时间的是从 mongodb 中提取文档作为字典并将它们分配到列表中的过程。
以下是一些包含 300K 文档的基准测试,它们展示了这种方法如何使用右侧batch_size实际上比将整个光标拉入列表更快:
%%time
df = pd.DataFrame(list(db.test.find().limit(300000)))
Run Code Online (Sandbox Code Playgroud)
CPU 时间:用户 35.3 秒,系统:2.14 秒,总计:37.5 秒挂墙时间:37.7 秒
batch_size=10000<-最快%%time
df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 10000):
df = df.append(batch, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)
CPU 时间:用户 29.5 秒,系统:1.23 秒,总计:30.7 秒挂墙时间:30.8 秒
batch_size=1000%%time
df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 1000):
df = df.append(batch, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)
CPU 时间:用户 44.8 秒,系统:2.09 秒,总计:46.9 秒挂墙时间:46.9 秒
batch_size=100000%%time
df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 100000):
df = df.append(batch, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)
CPU 时间:用户 34.6 秒,系统:1.15 秒,总计:35.8 秒挂墙时间:36 秒
| 归档时间: |
|
| 查看次数: |
1039 次 |
| 最近记录: |