Mid*_*ing 82 python multiprocessing
我有一个脚本,通过imap_unordered()
调用成功地执行多处理池任务集:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Run Code Online (Sandbox Code Playgroud)
但是,我num_tasks
大概是250,000,所以join()
锁定主线程10秒左右,我希望能够逐步回显到命令行,以显示主进程未被锁定.就像是:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
Run Code Online (Sandbox Code Playgroud)
是否有结果对象或池本身的方法指示剩余的任务数量?我尝试使用一个multiprocessing.Value
对象作为计数器(在完成任务后do_work
调用一个counter.value += 1
动作),但计数器只能达到总值的~85%,然后再停止递增.
Tim*_*Tim 83
我个人最喜欢的 - 给你一个很好的小进度条和完成ETA,同时运行和并行提交.
from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass
Run Code Online (Sandbox Code Playgroud)
jfs*_*jfs 73
无需访问结果集的私有属性:
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
Run Code Online (Sandbox Code Playgroud)
reu*_*ano 23
我发现当我试图检查它的进展时,工作已经完成了.这对我使用tqdm有用.
pip install tqdm
from multiprocessing import Pool
from tqdm import tqdm
tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))
def do_work(x):
# do something with x
pbar.update(1)
pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
Run Code Online (Sandbox Code Playgroud)
这应该适用于所有类型的多处理,无论它们是否阻塞.
Mid*_*ing 20
找到了答案我自己有一些更多的挖掘:以一看__dict__
的的imap_unordered
结果对象,我发现它有一个_index
与每个任务的完成增加属性.所以这适用于日志记录,包含在while
循环中:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)
Run Code Online (Sandbox Code Playgroud)
但是,我确实发现交换imap_unordered
for a map_async
导致执行速度更快,尽管结果对象有点不同.相反,结果对象from map_async
有一个_number_left
属性和一个ready()
方法:
p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)
Run Code Online (Sandbox Code Playgroud)
mra*_*acz 17
按照 Tim 的建议,您可以使用tqdm
和imap
来解决此问题。我刚刚偶然发现了这个问题并调整了imap_unordered
解决方案,以便我可以访问映射的结果。这是它的工作原理:
from multiprocessing import Pool
import tqdm
pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
Run Code Online (Sandbox Code Playgroud)
如果您不关心从作业返回的值,则不需要将列表分配给任何变量。
小智 9
安装
\npip install tqdm\n
Run Code Online (Sandbox Code Playgroud)\n例子
\nimport time\nimport threading\nfrom multiprocessing import Pool\n\nfrom tqdm import tqdm\n\n\ndef do_work(x):\n time.sleep(x)\n return x\n\n\ndef progress():\n time.sleep(3) # Check progress after 3 seconds\n print(f\'total: {pbar.total} finish:{pbar.n}\')\n\n\ntasks = range(10)\npbar = tqdm(total=len(tasks))\n\nif __name__ == \'__main__\':\n thread = threading.Thread(target=progress)\n thread.start()\n results = []\n with Pool(processes=5) as pool:\n for result in pool.imap_unordered(do_work, tasks):\n results.append(result)\n pbar.update(1)\n print(results)\n
Run Code Online (Sandbox Code Playgroud)\n结果
\n安装
\npip install flask\n
Run Code Online (Sandbox Code Playgroud)\n主要.py
\nimport time\nfrom multiprocessing import Pool\n\nfrom tqdm import tqdm\nfrom flask import Flask, make_response, jsonify\n\napp = Flask(__name__)\n\n\ndef do_work(x):\n time.sleep(x)\n return x\n\n\ntotal = 5 # num of tasks\ntasks = range(total)\npbar = tqdm(total=len(tasks))\n\n\n@app.route(\'/run/\')\ndef run():\n results = []\n with Pool(processes=2) as pool:\n for _result in pool.imap_unordered(do_work, tasks):\n results.append(_result)\n if pbar.n >= total:\n pbar.n = 0 # reset\n pbar.update(1)\n response = make_response(jsonify(dict(results=results)))\n response.headers.add(\'Access-Control-Allow-Origin\', \'*\')\n response.headers.add(\'Access-Control-Allow-Headers\', \'*\')\n response.headers.add(\'Access-Control-Allow-Methods\', \'*\')\n return response\n\n\n@app.route(\'/progress/\')\ndef progress():\n response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))\n response.headers.add(\'Access-Control-Allow-Origin\', \'*\')\n response.headers.add(\'Access-Control-Allow-Headers\', \'*\')\n response.headers.add(\'Access-Control-Allow-Methods\', \'*\')\n return response\n
Run Code Online (Sandbox Code Playgroud)\n运行(以 Windows 为例)
\nset FLASK_APP=main\nflask run\n
Run Code Online (Sandbox Code Playgroud)\nAPI列表
\n测试.html
\n<!DOCTYPE html>\n<html lang="en">\n<head>\n <meta charset="UTF-8">\n <title>Progress Bar</title>\n <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>\n <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>\n <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">\n</head>\n<body>\n<button id="run">Run the task</button>\n<br><br>\n<div class="progress">\n <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"\n style="width: 10%">0.00%\n </div>\n</div>\n</body>\n<script>\n function set_progress_rate(n, total) {\n //Set the rate of progress bar\n var rate = (n / total * 100).toFixed(2);\n if (n > 0) {\n $(".progress-bar").attr("aria-valuenow", n);\n $(".progress-bar").attr("aria-valuemax", total);\n $(".progress-bar").text(rate + "%");\n $(".progress-bar").css("width", rate + "%");\n }\n }\n\n $("#run").click(function () {\n //Run the task\n $.ajax({\n url: "http://127.0.0.1:5000/run/",\n type: "GET",\n success: function (response) {\n set_progress_rate(100, 100);\n console.log(\'Results\xef\xbc\x9a\' + response[\'results\']);\n }\n });\n });\n setInterval(function () {\n //Show progress every 1 second\n $.ajax({\n url: "http://127.0.0.1:5000/progress/",\n type: "GET",\n success: function (response) {\n console.log(response);\n var n = response["n"];\n var total = response["total"];\n set_progress_rate(n, total);\n }\n });\n }, 1000);\n</script>\n</html>\n
Run Code Online (Sandbox Code Playgroud)\n结果
\n我知道这是一个相当古老的问题,但是当我想跟踪python中任务池的进展时,这就是我正在做的事情.
from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep
def my_function(letter):
sleep(2)
return letter+letter
dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)
results = []
pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()
r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
while len(results) != len(dummy_args):
pbar.update(len(results))
sleep(0.5)
pbar.finish()
print results
Run Code Online (Sandbox Code Playgroud)
基本上,您将apply_async与callbak一起使用(在这种情况下,它是将返回的值附加到列表中),因此您不必等待其他操作.然后,在while循环中,检查工作的进度.在这种情况下,我添加了一个小部件,使其看起来更好.
输出:
4 of 4
['AA', 'BB', 'CC', 'DD']
Run Code Online (Sandbox Code Playgroud)
希望能帮助到你.
一个简单的解决方案Pool.apply_async()
:
from multiprocessing import Pool
from tqdm import tqdm
from time import sleep
def work(x):
sleep(0.2)
return x**2
n = 10
with Pool(4) as p, tqdm(total=n) as pbar:
res = [p.apply_async(
work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [r.get() for r in res]
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
62461 次 |
最近记录: |