多处理。进程和异步循环通信

Mer*_*ado 1 python multiprocessing python-asyncio

import asyncio
from multiprocessing import Queue, Process
import time

task_queue = Queue()

# This is simulating the task
async def do_task(task_number):
  for progress in range(task_number):
    print(f'{progress}/{task_number} doing')
    await asyncio.sleep(10)

# This is the loop that accepts and runs tasks
async def accept_tasks():
  event_loop = asyncio.get_event_loop()
  while True:
    task_number = task_queue.get() <-- this blocks event loop from running do_task()
    event_loop.create_task(do_task(task_number))

# This is the starting point of the process,
# the event loop runs here
def worker():
  event_loop = asyncio.get_event_loop()
  event_loop.run_until_complete(accept_tasks())

# Run a new process
Process(target=worker).start()

# Simulate adding tasks every 1 second
for _ in range(1,50):
  task_queue.put(_)
  print('added to queue', _)
  time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

我正在尝试运行一个单独的进程来运行事件循环来执行 I/O 操作。现在,我正在尝试从父进程“排队”任务。问题是 do_task() 不运行。唯一有效的解决方案是轮询(即检查是否为空,然后休眠 X 秒)。

经过一些研究,问题似乎是task_queue.get()没有进行事件循环友好的 IO。

aiopipe提供了一个解决方案,但假设两个进程都在事件循环中运行。

我尝试创建这个。但消费者并没有消费任何东西......

read_fd, write_fd = os.pipe()
consumer = AioPipeReader(read_fd)
producer = os.fdopen(write_fd, 'w')
Run Code Online (Sandbox Code Playgroud)

use*_*342 6

对于这种情况,一个简单的解决方法是更改task_number = task_queue.get()​​为task_number = await event_loop.run_in_executor(None, task_queue.get). 这样,阻塞Queue.get()函数将被卸载到线程池,并且当前的协程将被挂起,作为一个好的异步公民。同样,一旦线程池完成该函数,协程将恢复执行。

这种方法是一种解决方法,因为它无法扩展到大量并发任务:每个阻塞调用“变成异步”都会占用线程池中的一个插槽,而那些超过池最大工作线程数的线程甚至不会在 Threed 释放之前开始执行。例如,重写所有 asyncio 来调用阻塞函数run_in_executor只会导致线程系统编写得很糟糕。但是,如果您知道您的子进程数量很少,那么使用run_in_executor是正确的并且可以非常有效地解决问题。