ham*_*x0r 16 python ipython ipython-parallel
我在具有不同参数的循环中运行相同的模拟.每个模拟都使用一个data只读取的pandas DataFrame(),从不修改.使用ipyparallel(IPython parallel),我可以在模拟开始之前将此DataFrame放入我视图中每个引擎的全局变量空间:
view['data'] = data
Run Code Online (Sandbox Code Playgroud)
然后,引擎可以访问DataFrame以获取在其上运行的所有模拟.复制数据的过程(如果是腌制的,data是40MB)只需几秒钟.但是,似乎如果模拟的数量增加,则内存使用量会变得非常大.我想这个共享数据是为每个任务而不是仅为每个引擎复制的.从具有引擎的客户端共享静态只读数据的最佳实践是什么?每个引擎复制一次是可以接受的,但理想情况下每个主机只需要复制一次(我在host1上有4个引擎,在host2上有8个引擎).
这是我的代码:
from ipyparallel import Client
import pandas as pd
rc = Client()
view = rc[:] # use all engines
view.scatter('id', rc.ids, flatten=True) # So we can track which engine performed what task
def do_simulation(tweaks):
""" Run simulation with specified tweaks """
# Do sim stuff using the global data DataFrame
return results, id, tweaks
if __name__ == '__main__':
data = pd.read_sql("SELECT * FROM my_table", engine)
threads = [] # store list of tweaks dicts
for i in range(4):
for j in range(5):
for k in range(6):
threads.append(dict(i=i, j=j, k=k)
# Set up globals for each engine. This is the read-only DataFrame
view['data'] = data
ar = view.map_async(do_simulation, threads)
# Our async results should pop up over time. Let's measure our progress:
for idx, (results, id, tweaks) in enumerate(ar):
print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress / len(ar), idx, id)
# Store results as a pickle for the future
pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j'])
# Save our results to a pickle file
pd.to_pickle(results, out_file_path + pfile)
print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time)
Run Code Online (Sandbox Code Playgroud)
如果模拟计数很小(~50),那么开始需要一段时间,但我开始看到进度打印语句.奇怪的是,多个任务将被分配到同一个引擎,并且在为该引擎完成所有分配的任务之前我看不到响应.我希望enumerate(ar)每次单个模拟任务完成时都会看到响应.
如果模拟计数非常大(〜1000),才能开始很长一段时间,我看到的CPU扼杀了所有引擎,但没有取得任何进展的打印语句被视为直到很长一段时间(〜40分钟),当我不看进展,似乎一个大块(> 100)的任务进入同一个引擎,等待从一个引擎完成,然后再提供一些进度.当那个引擎完成时,我看到ar对象提供了4秒的新响应 - 这可能是编写输出pickle文件的时间延迟.
最后,host1还运行ipycontroller任务,它的内存使用量就像疯了一样(Python任务显示使用> 6GB RAM,内核任务显示使用3GB).host2引擎根本没有真正显示大量内存使用情况.什么会导致内存中的这个峰值?
几年前我在代码中使用了这个逻辑,并且我使用了它.我的代码是这样的:
shared_dict = {
# big dict with ~10k keys, each with a list of dicts
}
balancer = engines.load_balanced_view()
with engines[:].sync_imports(): # your 'view' variable
import pandas as pd
import ujson as json
engines[:].push(shared_dict)
results = balancer.map(lambda i: (i, my_func(i)), id)
results_data = results.get()
Run Code Online (Sandbox Code Playgroud)
如果模拟计数很小(~50),那么开始需要一段时间,但我开始看到进度打印语句.奇怪的是,多个任务将被分配到同一个引擎,并且在为该引擎完成所有分配的任务之前我看不到响应.我希望每次单个模拟任务完成时都会看到枚举(ar)的响应.
在我的情况下,my_func()是一个复杂的方法,我把大量的日志消息写入文件,所以我有我的打印语句.
关于任务分配,正如我所使用的load_balanced_view(),我离开了图书馆找到它的方式,它做得很好.
如果模拟计数很大(~1000),开始需要很长时间,我看到CPU在所有引擎上都加油,但是很长时间(~40分钟)看不到进度打印语句,当我看到进展,似乎一个大块(> 100)的任务进入同一个引擎,等待从一个引擎完成,然后再提供一些进度.当那个引擎完成时,我看到ar对象提供了4秒的新响应 - 这可能是编写输出pickle文件的时间延迟.
很长一段时间,我没有经历过,所以我不能说什么.
我希望这可能会给你的问题带来一些启示.
PS:正如我在评论中所说,你可以尝试多处理.Pool.我想我还没有尝试使用它来共享一个大的只读数据作为全局变量.我会尝试一下,因为它似乎有效.
| 归档时间: |
|
| 查看次数: |
2033 次 |
| 最近记录: |