a_g*_*est 5 python concurrency multithreading concurrent.futures
我正在使用一个Backend
类,该类生成一个子进程来执行 CPU 密集型工作。我无法控制该类,基本上唯一的交互方式是创建一个实例backend = Backend()
并通过提交工作backend.run(data)
(这又将工作提交给子流程并阻塞直到完成)。因为这些计算需要相当长的时间,所以我想并行执行它们。由于该类Backend
已经生成了自己的子进程来执行实际工作,因此这似乎是 IO 绑定的情况。
所以我考虑使用多个线程,每个线程都使用自己的Backend
实例。我可以手动创建这些线程并通过队列连接它们。以下是带有一些Backend
模拟类的示例实现:
import os
import pty
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
class Backend:
def __init__(self):
f, g = pty.openpty()
self.process = Popen(
['bash'], # example program
text=True, bufsize=1, stdin=PIPE, stdout=g)
self.write = self.process.stdin.write
self.read = os.fdopen(f).readline
def __enter__(self):
self.write('sleep 2\n') # startup work
return self
def __exit__(self, *exc):
self.process.stdin.close()
self.process.kill()
def run(self, x):
self.write(f'sleep {x} && echo "ok"\n') # perform work
return self.read().strip()
class Worker(Thread):
def __init__(self, inq, outq, **kwargs):
super().__init__(**kwargs)
self.inq = inq
self.outq = outq
def run(self):
with Backend() as backend:
while True:
data = self.inq.get()
result = backend.run(data)
self.outq.put((data, result))
task_queue = Queue()
result_queue = Queue()
n_workers = 3
threads = [Worker(task_queue, result_queue, daemon=True) for _ in range(n_workers)]
for thread in threads:
thread.start()
data = [2]*7
for x in data:
task_queue.put(x)
for _ in data:
print(f'Result ready: {result_queue.get()}')
Run Code Online (Sandbox Code Playgroud)
由于Backend
需要在启动时执行一些工作,我不想为每个任务创建一个新实例。因此,每个实例都会为其整个生命周期Worker
创建一个实例。Backend
同样重要的是,每个工作人员都有自己的后端,这样他们就不会互相干扰。
现在的问题是:我也可以用它来concurrent.futures.ThreadPoolExecutor
完成这个任务吗?看起来该Executor.map
方法是正确的候选者,但我不知道如何确保每个工作人员收到自己的实例Backend
(需要在任务之间保持不变)。
工作线程的状态可以保存在全局命名空间中,例如作为字典。然后threading.current_thread
可用于保存/加载每个工作人员的状态。contextlib.ExitStack
可以用作Backend
上下文管理器来进行适当的处理。
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack
import os
import pty
from subprocess import PIPE, Popen
import threading
class Backend:
...
backends = {}
exit_stack = ExitStack()
def init_backend():
backends[threading.current_thread()] = exit_stack.enter_context(Backend())
def compute(data):
return data, backends[threading.current_thread()].run(data)
with exit_stack:
with ThreadPoolExecutor(max_workers=3, initializer=init_backend) as executor:
for result in executor.map(compute, [2]*7):
print(f'Result ready: {result}')
Run Code Online (Sandbox Code Playgroud)