我正在做一些需要异步方法的繁重处理。我的一个方法返回一个字典列表,在将其添加到另一个可等待对象之前需要对其进行大量处理。IE。
def cpu_bound_task_here(record):
```some complicated preprocessing of record```
return record
Run Code Online (Sandbox Code Playgroud)
在好心人给出以下答案后,我的代码现在被卡住了。
async def fun():
print("Socket open")
record_count = 0
symbol = obj.symbol.replace("-", "").replace("/", "")
loop = asyncio.get_running_loop()
await obj.send()
while True:
try:
records = await obj.receive()
if not records:
continue
record_count += len(records)
Run Code Online (Sandbox Code Playgroud)
因此,上述函数的作用是异步流式传输值,并在无限期地推送到 Redis 之前进行一些繁重的处理。我做了必要的改变,但现在我陷入了困境。
正如该输出告诉您的那样,run_in_executor返回一个Future. 您需要等待它才能得到结果。
record = await loop.run_in_executor(
None, something_cpu_bound_task_here, record
)
Run Code Online (Sandbox Code Playgroud)
请注意,任何参数都something_cpu_bound_task_here需要传递给run_in_executor.
此外,正如您所提到的,这是一个 CPU 密集型任务,您需要确保您使用的是concurrent.futures.ProcessPoolExecutor. 除非您在某个地方调用过loop.set_default_executor,否则默认值是 的实例ThreadPoolExecutor。
with ProcessPoolExecutor() as executor:
for record in records:
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
Run Code Online (Sandbox Code Playgroud)
最后,您的 while 循环有效地同步运行。您需要等待 future,然后等待 for,然后obj.add才能继续处理 中的下一个项目records。您可能需要稍微重组您的代码并使用类似的东西gather来允许一些并发性。
async def process_record(record, obj, loop, executor):
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
await obj.add(record)
async def fun():
loop = asyncio.get_running_loop()
records = await receive()
with ProcessPoolExecutor() as executor:
await asyncio.gather(
*[process_record(record, obj, loop, executor) for record in records]
)
Run Code Online (Sandbox Code Playgroud)
我不确定如何处理,obj因为您的示例中没有定义这一点,但我相信您可以弄清楚。
| 归档时间: |
|
| 查看次数: |
4163 次 |
| 最近记录: |