显示Python多处理池映射调用的进度?

Mid*_*ing 82 python multiprocessing

我有一个脚本,通过imap_unordered()调用成功地执行多处理池任务集:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Run Code Online (Sandbox Code Playgroud)

但是,我num_tasks大概是250,000,所以join()锁定主线程10秒左右,我希望能够逐步回显到命令行,以显示主进程未被锁定.就像是:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)
Run Code Online (Sandbox Code Playgroud)

是否有结果对象或池本身的方法指示剩余的任务数量?我尝试使用一个multiprocessing.Value对象作为计数器(在完成任务后do_work调用一个counter.value += 1动作),但计数器只能达到总值的~85%,然后再停止递增.

Tim*_*Tim 83

我个人最喜欢的 - 给你一个很好的小进度条和完成ETA,同时运行和并行提交.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass
Run Code Online (Sandbox Code Playgroud)

  • 如果pool返回一个值怎么办? (44认同)
  • 我在循环之前创建了一个名为result的空列表,然后在循环内部执行result.append(x).我用2个进程尝试了这个,并使用了imap而不是map,一切都按照我想要的方式运行到@nickpick (9认同)
  • 不要忘记将此代码包装在 `if __name__ == "__main__":` 中,否则它可能神秘地不起作用 (4认同)
  • @ bs7280由result.append(x)表示您是指result.append(_)吗?什么是x? (3认同)
  • 所以我的进度条正在迭代到新的行而不是就地进行,任何想法为什么会这样? (2认同)
  • 不要忘记`pip install tqdm` (2认同)
  • @Nickpick 简单,结果 = [];对于 tqdm.tqdm(pool.imap_unordered(do_work,tasks),total=len(tasks)) 中的结果:results.append(result) (2认同)

jfs*_*jfs 73

无需访问结果集的私有属性:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
Run Code Online (Sandbox Code Playgroud)

  • 我和@HananShteingart有同样的问题:因为我试图使用`Pool.map()`.我没有意识到_only_`imap()`和`imap_unordered()`以这种方式工作 - 文档只是说"一个lazier版本的map()"但实际上意味着"底层迭代器在它们进入时返回结果" . (11认同)
  • 我看到只在代码退出后打印出来(不是每次迭代).你有什么建议吗? (7认同)
  • 也可能!我主要想记录一个我做过的愚蠢假设 - 以防其他人读这篇文章也是如此. (2认同)

reu*_*ano 23

我发现当我试图检查它的进展时,工作已经完成了.这对我使用tqdm有用.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
Run Code Online (Sandbox Code Playgroud)

这应该适用于所有类型的多处理,无论它们是否阻塞.

  • 我认为创建了一堆线程,每个线程都在独立计数 (3认同)

Mid*_*ing 20

找到了答案我自己有一些更多的挖掘:以一看__dict__的的imap_unordered结果对象,我发现它有一个_index与每个任务的完成增加属性.所以这适用于日志记录,包含在while循环中:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)
Run Code Online (Sandbox Code Playgroud)

但是,我确实发现交换imap_unorderedfor a map_async导致执行速度更快,尽管结果对象有点不同.相反,结果对象from map_async有一个_number_left属性和一个ready()方法:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)
Run Code Online (Sandbox Code Playgroud)

  • 我为Python 2.7.6测试了这个,rs._number_left似乎是剩余的块数.因此,如果rs._chunksize不是1,则rs._number_left将不是剩余列表项的数量. (3认同)

mra*_*acz 17

按照 Tim 的建议,您可以使用tqdmimap来解决此问题。我刚刚偶然发现了这个问题并调整了imap_unordered解决方案,以便我可以访问映射的结果。这是它的工作原理:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
Run Code Online (Sandbox Code Playgroud)

如果您不关心从作业返回的值,则不需要将列表分配给任何变量。


小智 9

快速开始

\n

使用tqdmmultiprocessing.Pool

\n

安装

\n
pip install tqdm\n
Run Code Online (Sandbox Code Playgroud)\n

例子

\n
import time\nimport threading\nfrom multiprocessing import Pool\n\nfrom tqdm import tqdm\n\n\ndef do_work(x):\n    time.sleep(x)\n    return x\n\n\ndef progress():\n    time.sleep(3)  # Check progress after 3 seconds\n    print(f\'total: {pbar.total} finish:{pbar.n}\')\n\n\ntasks = range(10)\npbar = tqdm(total=len(tasks))\n\nif __name__ == \'__main__\':\n    thread = threading.Thread(target=progress)\n    thread.start()\n    results = []\n    with Pool(processes=5) as pool:\n        for result in pool.imap_unordered(do_work, tasks):\n            results.append(result)\n            pbar.update(1)\n    print(results)\n
Run Code Online (Sandbox Code Playgroud)\n

结果

\n

\n




\n

烧瓶

\n

安装

\n
pip install flask\n
Run Code Online (Sandbox Code Playgroud)\n

主要.py

\n
import time\nfrom multiprocessing import Pool\n\nfrom tqdm import tqdm\nfrom flask import Flask, make_response, jsonify\n\napp = Flask(__name__)\n\n\ndef do_work(x):\n    time.sleep(x)\n    return x\n\n\ntotal = 5  # num of tasks\ntasks = range(total)\npbar = tqdm(total=len(tasks))\n\n\n@app.route(\'/run/\')\ndef run():\n    results = []\n    with Pool(processes=2) as pool:\n        for _result in pool.imap_unordered(do_work, tasks):\n            results.append(_result)\n            if pbar.n >= total:\n                pbar.n = 0  # reset\n            pbar.update(1)\n    response = make_response(jsonify(dict(results=results)))\n    response.headers.add(\'Access-Control-Allow-Origin\', \'*\')\n    response.headers.add(\'Access-Control-Allow-Headers\', \'*\')\n    response.headers.add(\'Access-Control-Allow-Methods\', \'*\')\n    return response\n\n\n@app.route(\'/progress/\')\ndef progress():\n    response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))\n    response.headers.add(\'Access-Control-Allow-Origin\', \'*\')\n    response.headers.add(\'Access-Control-Allow-Headers\', \'*\')\n    response.headers.add(\'Access-Control-Allow-Methods\', \'*\')\n    return response\n
Run Code Online (Sandbox Code Playgroud)\n

运行(以 Windows 为例)

\n
set FLASK_APP=main\nflask run\n
Run Code Online (Sandbox Code Playgroud)\n

API列表

\n\n

测试.html

\n
<!DOCTYPE html>\n<html lang="en">\n<head>\n    <meta charset="UTF-8">\n    <title>Progress Bar</title>\n    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>\n    <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>\n    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">\n</head>\n<body>\n<button id="run">Run the task</button>\n<br><br>\n<div class="progress">\n    <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"\n         style="width: 10%">0.00%\n    </div>\n</div>\n</body>\n<script>\n    function set_progress_rate(n, total) {\n        //Set the rate of progress bar\n        var rate = (n / total * 100).toFixed(2);\n        if (n > 0) {\n            $(".progress-bar").attr("aria-valuenow", n);\n            $(".progress-bar").attr("aria-valuemax", total);\n            $(".progress-bar").text(rate + "%");\n            $(".progress-bar").css("width", rate + "%");\n        }\n    }\n\n    $("#run").click(function () {\n        //Run the task\n        $.ajax({\n            url: "http://127.0.0.1:5000/run/",\n            type: "GET",\n            success: function (response) {\n                set_progress_rate(100, 100);\n                console.log(\'Results\xef\xbc\x9a\' + response[\'results\']);\n            }\n        });\n    });\n    setInterval(function () {\n        //Show progress every 1 second\n        $.ajax({\n            url: "http://127.0.0.1:5000/progress/",\n            type: "GET",\n            success: function (response) {\n                console.log(response);\n                var n = response["n"];\n                var total = response["total"];\n                set_progress_rate(n, total);\n            }\n        });\n    }, 1000);\n</script>\n</html>\n
Run Code Online (Sandbox Code Playgroud)\n

结果

\n

\n


Jul*_*lle 8

我知道这是一个相当古老的问题,但是当我想跟踪python中任务池的进展时,这就是我正在做的事情.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results
Run Code Online (Sandbox Code Playgroud)

基本上,您将apply_async与callbak一起使用(在这种情况下,它是将返回的值附加到列表中),因此您不必等待其他操作.然后,在while循环中,检查工作的进度.在这种情况下,我添加了一个小部件,使其看起来更好.

输出:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']
Run Code Online (Sandbox Code Playgroud)

希望能帮助到你.


zea*_*oas 8

一个简单的解决方案Pool.apply_async()

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.2)
    return x**2


n = 10

with Pool(4) as p, tqdm(total=n) as pbar:
    res = [p.apply_async(
        work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
    results = [r.get() for r in res]
Run Code Online (Sandbox Code Playgroud)

  • 完成后应关闭 Pool 和 pbar (2认同)