Ket*_*tzu 10 python multiprocess concurrent.futures
Python concurrent.futures和ProcessPoolExecutor提供了一个简洁的界面来安排和监视任务.期货甚至提供 .cancel()方法:
cancel():尝试取消通话.如果当前正在执行调用且无法取消,则该方法将返回False,否则将取消调用并且该方法将返回True.
不幸的是,在一个类似的问题(关于asyncio)中,回答声称运行任务是不可取消的,使用这个剪切的文档,但文档不要说,只有它们运行和不可解决.
向进程提交multiprocessing.Events也是不可能的(通过参数执行此操作,如在multiprocess.Process中返回RuntimeError)
我想做什么?我想分区搜索空间并为每个分区运行任务.但它足以拥有一个解决方案,而且这个过程是CPU密集型的.那么有一种实际的舒适方式可以通过使用ProcessPool开始来抵消收益吗?
例:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False
futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
for d in done:
print(d.result())
print("---")
for d in not_done:
# will return false for Cancel and Result for all futures
print("Cancel: "+str(d.cancel()))
print("Result: "+str(d.result()))
Run Code Online (Sandbox Code Playgroud)
我不知道为什么concurrent.futures.Future没有.kill()方法,但是可以通过关闭进程池pool.shutdown(wait=False)并手动杀死其余的子进程来完成所需的操作。
创建一个杀死子进程的函数:
import signal, psutil
def kill_child_processes(parent_pid, sig=signal.SIGTERM):
try:
parent = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return
children = parent.children(recursive=True)
for process in children:
process.send_signal(sig)
Run Code Online (Sandbox Code Playgroud)
运行代码,直到获得第一个结果,然后杀死所有剩余的子进程:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False
futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))
done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)
# Shut down pool
pool.shutdown(wait=False)
# Kill remaining child processes
kill_child_processes(os.getpid())
Run Code Online (Sandbox Code Playgroud)
不幸的是,跑步Futures不能被取消。我相信核心原因是要确保在不同的实现上使用相同的API(不可能中断正在运行的线程或协程)。
该卵石库的目的是要克服这种和其他限制。
from pebble import ProcessPool
def function(foo, bar=0):
return foo + bar
with ProcessPool() as pool:
future = pool.schedule(function, args=[1])
# if running, the container process will be terminated
# a new process will be started consuming the next task
future.cancel()
Run Code Online (Sandbox Code Playgroud)
我发现你的问题很有趣,所以这是我的发现。
我发现方法的行为.cancel()如 python 文档中所述。至于您正在运行的并发函数,不幸的是,即使被告知这样做,它们也无法取消。如果我的发现是正确的,那么我认为 Python 确实需要更有效的 .cancel() 方法。
运行下面的代码来检查我的发现。
from concurrent.futures import ProcessPoolExecutor, as_completed
from time import time
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 3351355150:
return elem
break #Added to terminate loop once found
return False
start = time()
futures = []
# used to create the partitions
steps = 1000000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))
### New Code: Start ###
for f in as_completed(futures):
print(f.result())
if f.result():
print('break')
break
for f in futures:
print(f, 'running?',f.running())
if f.running():
f.cancel()
print('Cancelled? ',f.cancelled())
print('New Instruction Ended at = ', time()-start )
print('Total Compute Time = ', time()-start )
Run Code Online (Sandbox Code Playgroud)
更新: 可以通过 bash 强制终止并发进程,但结果是主 python 程序也会终止。如果这对您来说不是问题,请尝试以下代码。
您必须在最后 2 个打印语句之间添加以下代码才能亲自查看。注意:此代码仅在您未运行任何其他 python3 程序时才有效。
import subprocess, os, signal
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],
stdout=subprocess.PIPE).stdout.decode('utf-8').split()
print ('result =', result)
for i in result:
print('PID = ', i)
if i != result[0]:
os.kill(int(i), signal.SIGKILL)
try:
os.kill(int(i), 0)
raise Exception("""wasn't able to kill the process
HINT:use signal.SIGKILL or signal.SIGABORT""")
except OSError as ex:
continue
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5818 次 |
| 最近记录: |