Gab*_*iel 9 python pool multiprocessing amazon-web-services pandas
我正在运行40GB数据的计算.每个文件都是一个包含json行的压缩gzip文件.每个文件最多有500,000行,或大约500MB.我有一个运行128 cpu和1952 GB内存的亚马逊实例.我要做的是尽快处理每个文件.
我正在使用这样的多处理池:
def initializeLock(l):
global lock
lock = l
if __name__ == '__main__':
directory = '/home/ubuntu/[directory_containing_files]/*.gz'
file_names = glob.glob(directory)
lock = Lock()
pool = Pool(initializer=initializeLock, initargs=(lock,))
pool.map(do_analysis, file_names)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
我期望发生的是创建大量进程,每个进程处理一个文件.实际发生的是最初创建了100多个进程.在这一点上,我使用了大约85%的记忆力,这太棒了!然后每个完成.最终运行的进程数量下降到大约10.此时我只使用了5%的内存.定期启动其他进程,但它永远不会恢复运行100左右.所以我拥有这个拥有所有这些空闲内存的大型CPU,但我大部分时间都在运行大多数10个进程.
任何想法,如何让它继续运行100个进程,直到所有文件都完成?
编辑:
我在应用程序中添加了一些日志记录 最初它加载了127个进程,我认为这是因为我有128个CPU,其中一个在加载进程时正在使用.某些过程成功完成,结果已保存.然后在某些时候,只有少数正在运行的进程结束.当我检查完成了多少文件时,127个中的22个完成了.然后它只使用5-10个进程运行,所有这些都成功完成.我想也许它会耗尽内存和崩溃.但为什么?我有很多内存和很多CPU.
编辑2:
所以我发现了这个问题.问题是我在do_analysis方法中设置了一个锁,并且所有进程大约在同一时间完成并等待锁被释放.这些过程没有停止,他们正在睡觉.所以这让我想到另一个问题:我的主要目标是获取具有许多json行的每个文件,从json行获取ID属性,然后将其附加到包含具有相同id的其他行的文件.如果文件不存在,我创建它.我所做的是在访问文件时设置锁定,以避免被另一个进程访问.这是我的代码.
for key, value in dataframe.iteritems():
if os.path.isfile(file_name):
lock.acquire()
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
lock.release()
else:
value.to_csv(filename), header=True, encoding='utf-8')
Run Code Online (Sandbox Code Playgroud)
所以现在我正在尝试一种创造性的方式来附加到文件,但不会阻止其他所有进程.我正在处理大量数据,并且需要同时访问两个文件的可能性很低,但它仍然会发生.所以我需要确保在附加文件时,另一个进程不会尝试打开该文件.
谢谢各位的意见。这是我目前对该问题的解决方案,我计划在接下来的一周内使其更加高效。我采纳了 Martin 的建议,一旦文件全部完成,我就将它们粘合在一起,但是,我想努力实现 daphtdazz 解决方案,在我生成更多文件的同时,让一个流程工作来与队列进行粘合。
def do_analyis(file):
# To keep the file names unique, I append the process id to the end
process_id = multiprocessing.current_process().pid
# doing analysis work...
for key, value in dataframe.iteritems():
if os.path.isfile(filename):
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
else:
value.to_csv(filename), header=True, encoding='utf-8')
def merge_files(base_file_name):
write_directory = 'write_directory'
all_files = glob.glob('{0}*'.format(base_file_name))
is_file_created = False
for file in all_files:
if is_file_created:
print 'File already exists, appending'
dataframe = pandas.read_csv(file, index_col=0)
dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), mode='a', header=False, encoding='utf-8')
else:
print 'File does not exist, creating.'
dataframe = pandas.read_csv(file, index_col=0)
dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), header=True, encoding='utf-8')
is_file_created = True
if __name__ == '__main__':
# Run the code to do analysis and group files by the id in the json lines
directory = 'directory'
file_names = glob.glob(directory)
pool = Pool()
pool.imap_unordered(do_analysis, file_names, 1)
pool.close()
pool.join()
# Merge all of the files together
base_list = get_unique_base_file_names('file_directory')
pool = Pool()
pool.imap_unordered(merge_files, base_list, 100)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
这会保存每个文件,并将唯一的进程 ID 附加到文件末尾,然后返回并按 json 文件中的 id 获取所有文件,并将它们全部合并在一起。创建文件时,CPU 使用率在 60-70% 之间。那是不错的。合并文件时,cpu使用率在8%左右。这是因为文件合并得如此之快,以至于我不需要我拥有的所有 CPU 处理能力。这个解决方案有效。但它可能会更有效率。我将努力同时完成这两件事。欢迎任何建议。
| 归档时间: |
|
| 查看次数: |
680 次 |
| 最近记录: |