使用Concurrent Futures而不会耗尽RAM

GIS*_*han 4 python parallel-processing memory-management python-3.x

我正在做一些文件解析,这是一个CPU绑定任务.无论我在这个过程中抛出多少文件,它都使用不超过大约50MB的RAM.该任务是可并行的,我已将其设置为使用下面的并发期货来解析每个文件作为一个单独的过程:

    from concurrent import futures
    with futures.ProcessPoolExecutor(max_workers=6) as executor:
        # A dictionary which will contain a list the future info in the key, and the filename in the value
        jobs = {}

        # Loop through the files, and run the parse function for each file, sending the file-name to it.
        # The results of can come back in any order.
        for this_file in files_list:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)
Run Code Online (Sandbox Code Playgroud)

问题是,当我使用期货,RAM使用火箭运行时,不久我已经用完了,Python已经崩溃了.这可能在很大程度上是因为parse_function的结果大小为几MB.一旦结果通过post_processing,应用程序就不再需要它们了.正如你所看到的,我试图del jobs[job]清除项目jobs,但这没有任何区别,内存使用量保持不变,并且似乎以相同的速率增加.

我也证实了这不是因为它post_process只是使用一个进程来等待该功能,再加上一个进程time.sleep(1).

关于内存管理的期货文档中没有任何内容,虽然简短的搜索表明它已经出现在未来的实际应用程序中(python循环中的清除内存http://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures) - 答案不会转化为我的用例(他们都关注超时等).

那么,如何使用并发期货而不会耗尽RAM?(Python 3.5)

Yoa*_*ner 8

我会开枪(可能是错误的猜测......)

您可能需要一点一点地提交您的工作,因为在每次提交时您正在制作一个parser_variables的副本,这可能最终会咀嚼您的RAM.

这是有趣的部分上带有"<----"的代码

with futures.ProcessPoolExecutor(max_workers=6) as executor:
    # A dictionary which will contain a list the future info in the key, and the filename in the value
    jobs = {}

    # Loop through the files, and run the parse function for each file, sending the file-name to it.
    # The results of can come back in any order.
    files_left = len(files_list) #<----
    files_iter = iter(files_list) #<------

    while files_left:
        for this_file in files_iter:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file
            if len(jobs) > MAX_JOBS_IN_QUEUE:
                break #limit the job submission for now job

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            files_left -= 1 #one down - many to go...   <---

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)
            break; #give a chance to add more jobs <-----
Run Code Online (Sandbox Code Playgroud)

  • @ GIS-Jonathan-另外,[`futures.as_completed()`](https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/_base.py#L196)在内部使它正在操作的期货的副本。如果`parse_function`可以接受并返回文件名`jobs`,则可以在调用`as_completed`之后立即删除它,并且只要`as_completed`及其助手对其“取消引用”,垃圾回收就可以免除它。这就是我的样子,不确定是否可以接受任何实际的改进,也许在整个过程中将未来及其(文件)名称保持在一起。 (2认同)

小智 7

尝试del像这样添加到您的代码中:

for job in futures.as_completed(jobs):
    del jobs[job]  # or `val = jobs.pop(job)`
    # del job  # or `job._result = None`
Run Code Online (Sandbox Code Playgroud)