具有有状态工作线程的 ThreadPoolExecutor

a_g*_*est 5 python concurrency multithreading concurrent.futures

我正在使用一个Backend类,该类生成一个子进程来执行 CPU 密集型工作。我无法控制该类,基本上唯一的交互方式是创建一个实例backend = Backend()并通过提交工作backend.run(data)(这又将工作提交给子流程并阻塞直到完成)。因为这些计算需要相当长的时间,所以我想并行执行它们。由于该类Backend已经生成了自己的子进程来执行实际工作,因此这似乎是 IO 绑定的情况。

所以我考虑使用多个线程,每个线程都使用自己的Backend实例。我可以手动创建这些线程并通过队列连接它们。以下是带有一些Backend模拟类的示例实现:

import os
import pty
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread


class Backend:
    def __init__(self):
        f, g = pty.openpty()
        self.process = Popen(
            ['bash'],  # example program
            text=True, bufsize=1, stdin=PIPE, stdout=g)
        self.write = self.process.stdin.write
        self.read = os.fdopen(f).readline

    def __enter__(self):
        self.write('sleep 2\n')  # startup work
        return self

    def __exit__(self, *exc):
        self.process.stdin.close()
        self.process.kill()

    def run(self, x):
        self.write(f'sleep {x} && echo "ok"\n')  # perform work
        return self.read().strip()


class Worker(Thread):
    def __init__(self, inq, outq, **kwargs):
        super().__init__(**kwargs)
        self.inq = inq
        self.outq = outq

    def run(self):
        with Backend() as backend:
            while True:
                data = self.inq.get()
                result = backend.run(data)
                self.outq.put((data, result))


task_queue = Queue()
result_queue = Queue()

n_workers = 3
threads = [Worker(task_queue, result_queue, daemon=True) for _ in range(n_workers)]
for thread in threads:
    thread.start()

data = [2]*7
for x in data:
    task_queue.put(x)

for _ in data:
    print(f'Result ready: {result_queue.get()}')
Run Code Online (Sandbox Code Playgroud)

由于Backend需要在启动时执行一些工作,我不想为每个任务创建一个新实例。因此,每个实例都会为其整个生命周期Worker创建一个实例。Backend同样重要的是,每个工作人员都有自己的后端,这样他们就不会互相干扰。

现在的问题是:我也可以用它来concurrent.futures.ThreadPoolExecutor完成这个任务吗?看起来该Executor.map方法是正确的候选者,但我不知道如何确保每个工作人员收到自己的实例Backend(需要在任务之间保持不变)。

a_g*_*est 3

工作线程的状态可以保存在全局命名空间中,例如作为字典。然后threading.current_thread可用于保存/加载每个工作人员的状态。contextlib.ExitStack可以用作Backend上下文管理器来进行适当的处​​理。

from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack
import os
import pty
from subprocess import PIPE, Popen
import threading


class Backend:
    ...


backends = {}
exit_stack = ExitStack()


def init_backend():
    backends[threading.current_thread()] = exit_stack.enter_context(Backend())


def compute(data):
    return data, backends[threading.current_thread()].run(data)


with exit_stack:
    with ThreadPoolExecutor(max_workers=3, initializer=init_backend) as executor:
        for result in executor.map(compute, [2]*7):
            print(f'Result ready: {result}')
Run Code Online (Sandbox Code Playgroud)