KZi*_*vas 5 python threadpoolexecutor python-asyncio concurrent.futures python-multiprocessing
好吧,这有点复杂,但我有一个带有很多异步代码的异步类。
我希望在该类中并行化一个任务,并且我想生成多个进程来运行阻塞任务,并且在每个进程中我想创建一个asyncio循环来处理各种子任务。
所以我无法使用 ThreadPollExecutor 来做到这一点,但是当我尝试使用 ProcessPoolExecutor 时,我收到一个 Can't pickle local object 错误。
这是我的代码的简化版本,与 ThreadPoolExecutor 一起运行。如何与 ProcessPoolExecutor 并行?
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class MyClass:
def __init__(self) -> None:
self.event_loop = None
# self.pool_executor = ProcessPoolExecutor(max_workers=8)
self.pool_executor = ThreadPoolExecutor(max_workers=8)
self.words = ["one", "two", "three", "four", "five"]
self.multiplier = int(2)
async def subtask(self, letter: str):
await asyncio.sleep(1)
return letter * self.multiplier
async def task_gatherer(self, subtasks: list):
return await asyncio.gather(*subtasks)
def blocking_task(self, word: str):
time.sleep(1)
subtasks = [self.subtask(letter) for letter in word]
result = asyncio.run(self.task_gatherer(subtasks))
return result
async def master_method(self):
self.event_loop = asyncio.get_running_loop()
master_tasks = [
self.event_loop.run_in_executor(
self.pool_executor,
self.blocking_task,
word,
)
for word in self.words
]
results = await asyncio.gather(*master_tasks)
print(results)
if __name__ == "__main__":
my_class = MyClass()
asyncio.run(my_class.master_method())
Run Code Online (Sandbox Code Playgroud)
这个问题问得好。问题和解决方案都非常有趣。
问题
多线程和多处理之间的区别之一是内存的处理方式。线程共享内存空间。进程则不然(一般来说,见下文)。
对象仅通过引用传递给 ThreadPoolExecutor。无需创建新对象。
但是 ProcessPoolExecutor 驻留在单独的内存空间中。为了将对象传递给它,该实现会腌制对象并在另一侧再次取消腌制它们。这个细节通常很重要。
blocking_task仔细查看原始问题中的论点。我的意思不是word- 我的意思是第一个参数:self。那个永远存在的人。我们已经看过它一百万次,但几乎没有想到它。要执行该函数blocking_task,名为“self”的参数需要一个值。要在 ProcessPoolExecutor 中运行此函数,“self”必须进行 pickled 和 unpickled。现在看看“self”的一些成员对象:有一个事件循环,还有执行器本身。两者都不可腌制。那就是问题所在。
我们无法在另一个进程中按原样运行该函数。
不可否认,回溯消息“无法腌制本地对象”还有很多不足之处。文档也是如此。但实际上,该程序适用于 ThreadPool,但不适用于 ProcessPool,这是完全有道理的。
注意:存在在进程之间共享 ctypes 对象的机制。然而,据我所知,没有办法直接共享Python对象。这就是使用 pickle/unpickle 机制的原因。
解决方案
重构 MyClass 以将数据与多处理框架分开。我创建了第二个类 MyTask,它可以进行 pickle 和 unpickled。我将 MyClass 中的一些函数移入其中。原始列表没有任何重要的修改 - 只是重新排列。
该脚本使用 ProcessPoolExecutor 和 ThreadPoolExecutor 均成功运行。
import asyncio
import time
# from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
# Refactored MyClass to break out MyTask
class MyTask:
def __init__(self):
self.multiplier = 2
async def subtask(self, letter: str):
await asyncio.sleep(1)
return letter * self.multiplier
async def task_gatherer(self, subtasks: list):
return await asyncio.gather(*subtasks)
def blocking_task(self, word: str):
time.sleep(1)
subtasks = [self.subtask(letter) for letter in word]
result = asyncio.run(self.task_gatherer(subtasks))
return result
class MyClass:
def __init__(self):
self.task = MyTask()
self.event_loop: asyncio.AbstractEventLoop = None
self.pool_executor = ProcessPoolExecutor(max_workers=8)
# self.pool_executor = ThreadPoolExecutor(max_workers=8)
self.words = ["one", "two", "three", "four", "five"]
async def master_method(self):
self.event_loop = asyncio.get_running_loop()
master_tasks = [
self.event_loop.run_in_executor(
self.pool_executor,
self.task.blocking_task,
word,
)
for word in self.words
]
results = await asyncio.gather(*master_tasks)
print(results)
if __name__ == "__main__":
my_class = MyClass()
asyncio.run(my_class.master_method())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3423 次 |
| 最近记录: |