Python:concurrent.futures如何使其可取消?

Ket*_*tzu 10 python multiprocess concurrent.futures

Python concurrent.futures和ProcessPoolExecutor提供了一个简洁的界面来安排和监视任务.期货甚至提供 .cancel()方法:

cancel():尝试取消通话.如果当前正在执行调用且无法取消,则该方法将返回False,否则将取消调用并且该方法将返回True.

不幸的是,在一个类似的问题(关于asyncio)中,回答声称运行任务是不可取消的,使用这个剪切的文档,但文档不要说,只有它们运行和不可解决.

向进程提交multiprocessing.Events也是不可能的(通过参数执行此操作,如在multiprocess.Process中返回RuntimeError)

我想做什么?我想分区搜索空间并为每个分区运行任务.但它足以拥有一个解决方案,而且这个过程是CPU密集型的.那么有一种实际的舒适方式可以通过使用ProcessPool开始来抵消收益吗?

例:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    for d in done:
        print(d.result())

    print("---")
    for d in not_done:
        # will return false for Cancel and Result for all futures
        print("Cancel: "+str(d.cancel()))
        print("Result: "+str(d.result()))
Run Code Online (Sandbox Code Playgroud)

ost*_*ach 5

我不知道为什么concurrent.futures.Future没有.kill()方法,但是可以通过关闭进程池pool.shutdown(wait=False)并手动杀死其余的子进程来完成所需的操作。

创建一个杀死子进程的函数:

import signal, psutil

def kill_child_processes(parent_pid, sig=signal.SIGTERM):
    try:
        parent = psutil.Process(parent_pid)
    except psutil.NoSuchProcess:
        return
    children = parent.children(recursive=True)
    for process in children:
        process.send_signal(sig)
Run Code Online (Sandbox Code Playgroud)

运行代码,直到获得第一个结果,然后杀死所有剩余的子进程:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
    # run 4 tasks with a partition, but only *one* solution is needed
    partition = range(i*steps,(i+1)*steps)
    futures.append(pool.submit(m_run, partition))

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)

# Shut down pool
pool.shutdown(wait=False)

# Kill remaining child processes
kill_child_processes(os.getpid())
Run Code Online (Sandbox Code Playgroud)


nox*_*fox 5

不幸的是,跑步Futures不能被取消。我相信核心原因是要确保在不同的实现上使用相同的API(不可能中断正在运行的线程或协程)。

卵石库的目的是要克服这种和其他限制。

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  
Run Code Online (Sandbox Code Playgroud)

  • 我发现很容易知道“pebble”期货继承自“concurrent.futures”期货。因此,`concurrent.futures` 提供的许多方法也可以应用于 `pebble` 期货,即使 `pebble` 没有实现这些方法。例如,这适用于“concurrent.futures”的“as_completed”方法。因此,切换到 pebble 可能就像添加导入并更改“ProcessPoolExecuter”和“pool.submit”的名称一样简单。 (3认同)
  • 这可能是显而易见的,但我只是想指出,如果您使用 ProcessPool,您将不再使用多个线程,而是使用多个进程。许多人不会关心其中的区别,但至少知道你在做什么是值得的。 (3认同)

Sun*_*ear 1

我发现你的问题很有趣,所以这是我的发现。

我发现方法的行为.cancel()如 python 文档中所述。至于您正在运行的并发函数,不幸的是,即使被告知这样做,它们也无法取消。如果我的发现是正确的,那么我认为 Python 确实需要更有效的 .cancel() 方法。

运行下面的代码来检查我的发现。

from concurrent.futures import ProcessPoolExecutor, as_completed
from time import time 

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 3351355150:
            return elem
            break #Added to terminate loop once found
    return False

start = time()
futures = []
# used to create the partitions
steps = 1000000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    ### New Code: Start ### 
    for f in as_completed(futures):
        print(f.result())
        if f.result():
            print('break')
            break

    for f in futures:
        print(f, 'running?',f.running())
        if f.running():
            f.cancel()
            print('Cancelled? ',f.cancelled())

    print('New Instruction Ended at = ', time()-start )
print('Total Compute Time = ', time()-start )
Run Code Online (Sandbox Code Playgroud)

更新: 可以通过 bash 强制终止并发进程,但结果是主 python 程序也会终止。如果这对您来说不是问题,请尝试以下代码。

您必须在最后 2 个打印语句之间添加以下代码才能亲自查看。注意:此代码仅在您未运行任何其他 python3 程序时才有效。

import subprocess, os, signal 
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],
                        stdout=subprocess.PIPE).stdout.decode('utf-8').split()
print ('result =', result)
for i in result:
    print('PID = ', i)
    if i != result[0]:
        os.kill(int(i), signal.SIGKILL)
        try: 
           os.kill(int(i), 0)
           raise Exception("""wasn't able to kill the process 
                              HINT:use signal.SIGKILL or signal.SIGABORT""")
        except OSError as ex:
           continue
Run Code Online (Sandbox Code Playgroud)