Ear*_*rey 7 python parallel-processing multiprocessing python-3.x python-multiprocessing
我正在尝试从大约 100 个文件中拆分和重新排序数据。这些文件包含有关大约的信息。40 万用户,按时间顺序排列。目的是遍历 100 个文件并为每个用户创建一个单独的文件。
该方法是使用 python 多处理库来创建多个进程: - 一个工作进程处理池,用于加载数据、对其进行排序并将批次附加到队列中。每个批次包含一个用户的数据。- 一个从队列中取出元素并将批次添加到队列中的进程
现在,下面的代码适用于装有 Windows 10、Python 3.7 的笔记本电脑。然而,我试图在带有 ubuntu 16.04 和 python 3.5 的服务器上运行它。虽然在 Windows 上代码运行没有问题,但服务器内存不足。现在笔记本电脑有 8GB 的 RAM,而服务器有 256GB。我在这里缺少什么?
编辑:我已将代码替换为您应该能够运行的简化版本。代码现在创建进程,将 numpy 数组添加到队列中。另一个进程使数组出列。在数组排队期间,打印队列的大小。由于元素出列,队列大小保持较低。但是,内存使用量不断增加。
import numpy as np
import sys
import multiprocessing
import time
def queue_remover(q):
while(1):
if not q.empty():
data = q.get()
def queue_adder(q, unused_number):
for _ in range(5):
q.put(np.zeros((1000,1000)))
print('q size ', q.qsize())
sys.stdout.flush()
time.sleep(1)
if __name__ == "__main__":
list_of_numbers = list(range(500))
m = multiprocessing.Manager()
queue = m.Queue(maxsize=40000)
writer = multiprocessing.Process(target=queue_remover, args=(queue,))
writer.start()
processing_pool = multiprocessing.Pool(processes=3, maxtasksperchild=1)
processing_pool.starmap(queue_adder, [(queue, number) for number in list_of_numbers])
processing_pool.close()
processing_pool.join()
writer.join()
Run Code Online (Sandbox Code Playgroud)
编辑:我刚刚运行内存分析器来分析单个进程,在 Windows 上大约需要 2 分钟,在 ubuntu 上为 8 分钟。请注意,在两个系统上,队列大小的打印仍然接近 0 - 元素肯定会在两者中出列。