Ben*_*Ben 8 concurrency subprocess python-3.x python-asyncio
我的 Python 脚本包含一个循环,用于subprocess在脚本外部运行命令。每个子进程都是独立的。我监听返回的消息,以防出现错误;我不能忽略子进程的结果。这是没有 asyncio 的脚本(我用 替换了计算量大的调用sleep):
from subprocess import PIPE # https://docs.python.org/3/library/subprocess.html
import subprocess
def go_do_something(index: int) -> None:
"""
This function takes a long time
Nothing is returned
Each instance is independent
"""
process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
stdout = process.stdout.decode("utf-8")
stderr = process.stderr.decode("utf-8")
if "error" in stderr:
print("error for "+str(index))
return
def my_long_func(val: int) -> None:
"""
This function contains a loop
Each iteration of the loop calls a function
Nothing is returned
"""
for index in range(val):
print("index = "+str(index))
go_do_something(index)
# run the script
my_long_func(3) # launch three tasks
Run Code Online (Sandbox Code Playgroud)
我想我可以用来asyncio加速这个活动,因为 Python 脚本正在等待外部subprocess完成。我认为threading或multiprocessing不是必要的,尽管它们也可以导致更快的执行。使用任务队列(例如,Celery)是另一种选择。
我尝试实现该asyncio方法,但缺少一些内容,因为以下尝试不会改变总体执行时间:
import asyncio
from subprocess import PIPE # https://docs.python.org/3/library/subprocess.html
import subprocess
async def go_do_something(index: int) -> None:
"""
This function takes a long time
Nothing is returned
Each instance is independent
"""
process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
stdout = process.stdout.decode("utf-8")
stderr = process.stderr.decode("utf-8")
if "error" in stderr:
print("error for "+str(index))
return
def my_long_func(val: int) -> None:
"""
This function contains a loop
Each iteration of the loop calls a function
Nothing is returned
"""
# https://docs.python.org/3/library/asyncio-eventloop.html
loop = asyncio.get_event_loop()
tasks = []
for index in range(val):
task = go_do_something(index)
tasks.append(task)
# https://docs.python.org/3/library/asyncio-task.html
tasks = asyncio.gather(*tasks)
loop.run_until_complete(tasks)
loop.close()
return
my_long_func(3) # launch three tasks
Run Code Online (Sandbox Code Playgroud)
如果我想监视每个的输出subprocess但不想在每个subprocess运行时等待,我可以从中受益吗asyncio?或者这种情况需要类似multiprocessingCelery 的东西吗?
asyncio尝试使用而不是执行命令subprocess。
定义一个run()函数:
import asyncio
async def run(cmd: str):
proc = await asyncio.create_subprocess_shell(
cmd,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
Run Code Online (Sandbox Code Playgroud)
然后你可以像调用任何async函数一样调用它:
asyncio.run(run('sleep 2'))
#=>
['sleep 2' exited with 0]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
16149 次 |
| 最近记录: |