如何使 ProcessPoolExecutor 中的任务表现得像守护进程?

Yur*_*nov 4 python python-3.x python-asyncio python-multiprocessing

Python 3.6.6

这是代码:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor


executor_processes = ProcessPoolExecutor(2)


def calculate():
    while True:
        print("while")
        time.sleep(1)


async def async_method():
    loop_ = asyncio.get_event_loop()
    loop_.run_in_executor(executor_processes, calculate)
    await asyncio.sleep(1)
    print("finish sleep")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_method())
    print("main_thread is finished")
Run Code Online (Sandbox Code Playgroud)

输出:

while
finish sleep
main_thread 已完成
while
while
...

我希望子进程将被终止,就像当使用守护进程属性生成进程时一样:

import asyncio
import time
import multiprocessing


def calculate():
    while True:
        print("while")
        time.sleep(1)


async def async_method():
    proc = multiprocessing.Process(target=calculate)
    proc.daemon = True
    proc.start()
    await asyncio.sleep(1)
    print("finish sleep")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_method())
    print("main_thread is finished")
Run Code Online (Sandbox Code Playgroud)

输出:

while
finish sleep
main_thread 已完成

问题:如何将loop_.run_in_executor(executor_processes, calculate)行为更改为“类似守护进程”?

shm*_*mee 5

您显示的代码显然只是一个小示例,用于演示您希望实现的目标。我们不知道您的实际任务/问题。但老实说,我不相信你走在正确的道路上。

ProcessPoolExecutor是标准库包的一部分concurrent.futuresFuture它在调用时向调用者返回 a submit()。这Future是尚未完成的计算结果的代理。约定了; 尽管这个术语在技术上在这种情况下并不完全正确。请参阅Wiki 页面了解区别。

这意味着计算预计有限时间内完成并产生结果。
这就是为什么Python 中的ThreadPoolExecutorProcessPoolExecutor实现不允许您生成守护进程工作者。要求一个你实际上并不希望实现的结果的承诺没有多大意义。

你怎么还能实现你的目标?

1 - 子类ProcessPoolExecutor
您可以拦截新进程的创建和启动以潜入p.daemon = Truein _adjust_process_count()。但是,由于concurrent.futures设计时并未考虑到无限期运行的任务,因此这不会有太大帮助。与 不同的是multiprocessingconcurrent.futures.process定义了一个不考虑守护进程的退出处理程序。它只是尝试join()一切,这可能需要一些时间来无限循环。

2 - 定义您自己的退出处理程序!
您可以同时执行这两个操作,multiprocessing并且concurrent.futures.process执行以下操作:定义一个退出处理程序,该处理程序在您的 Python 进程即将关闭时进行清理。atexit可以帮助解决这个问题:

import atexit

executor_processes = ProcessPoolExecutor(2)

def calculate():
    while True:
        print("while")
        time.sleep(1)

def end_processes():
    [proc.terminate() for proc in multiprocessing.active_children()]

async def async_method():
    [...]

if __name__ == '__main__':
    atexit.register(end_processes)
    loop = asyncio.get_event_loop()
    [...]
Run Code Online (Sandbox Code Playgroud)

注意:这将终止进程结束时所有活动的子进程。如果您想要正常关闭某些子进程,请保留一个句柄,并在代码中的说明结束之前执行此操作。
还要注意流程可以拒绝兑现terminate()kill()是你最后的手段。