Mer*_*ado 1 python multiprocessing python-asyncio
import asyncio
from multiprocessing import Queue, Process
import time
task_queue = Queue()
# This is simulating the task
async def do_task(task_number):
for progress in range(task_number):
print(f'{progress}/{task_number} doing')
await asyncio.sleep(10)
# This is the loop that accepts and runs tasks
async def accept_tasks():
event_loop = asyncio.get_event_loop()
while True:
task_number = task_queue.get() <-- this blocks event loop from running do_task()
event_loop.create_task(do_task(task_number))
# This is the starting point of the process,
# the event loop runs here
def worker():
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(accept_tasks())
# Run a new process
Process(target=worker).start()
# Simulate adding tasks every 1 second
for _ in range(1,50):
task_queue.put(_)
print('added to queue', _)
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
我正在尝试运行一个单独的进程来运行事件循环来执行 I/O 操作。现在,我正在尝试从父进程“排队”任务。问题是 do_task() 不运行。唯一有效的解决方案是轮询(即检查是否为空,然后休眠 X 秒)。
经过一些研究,问题似乎是task_queue.get()没有进行事件循环友好的 IO。
aiopipe提供了一个解决方案,但假设两个进程都在事件循环中运行。
我尝试创建这个。但消费者并没有消费任何东西......
read_fd, write_fd = os.pipe()
consumer = AioPipeReader(read_fd)
producer = os.fdopen(write_fd, 'w')
Run Code Online (Sandbox Code Playgroud)
对于这种情况,一个简单的解决方法是更改task_number = task_queue.get()为task_number = await event_loop.run_in_executor(None, task_queue.get). 这样,阻塞Queue.get()函数将被卸载到线程池,并且当前的协程将被挂起,作为一个好的异步公民。同样,一旦线程池完成该函数,协程将恢复执行。
这种方法是一种解决方法,因为它无法扩展到大量并发任务:每个阻塞调用“变成异步”都会占用线程池中的一个插槽,而那些超过池最大工作线程数的线程甚至不会在 Threed 释放之前开始执行。例如,重写所有 asyncio 来调用阻塞函数run_in_executor只会导致线程系统编写得很糟糕。但是,如果您知道您的子进程数量很少,那么使用run_in_executor是正确的并且可以非常有效地解决问题。
| 归档时间: |
|
| 查看次数: |
3321 次 |
| 最近记录: |