asyncio: running task only if all other tasks are awaiting

Zak*_*cke 10 python concurrency python-asyncio

I am currently running some endless tasks using asyncio.wait

I need a special function to run when all the others are on await

import asyncio 

async def special_function():
    while True:
        # does some work, 
        # Passes control back to controller to run main_tasks
        # if they are no longer waiting.
        await asyncio.sleep(0)

async def handler():
    tasks = [task() for task in main_tasks]

    # Adding the task that I want to run when all main_tasks are awaiting:
    tasks.append(special_function())

    await asyncio.wait(tasks)

asyncio.get_event_loop().run_until_complete(handler())
Run Code Online (Sandbox Code Playgroud)

How can I get the special_function to only be run when all main_tasks are on await?


Edit:

What I mean by "all main_tasks are on await": all main_tasks are not ready to continue, e.g. are in asyncio.sleep(100) or I/O bound and still waiting for data.

Therefore the main_tasks cannot continue and the event loop runs the special_function while the tasks are in this state, NOT every iteration of the event loop.


Edit 2:

My use case:

The main_tasks are updating a data structure with new data from web-sockets.

The special_function transfers that data to another process upon an update signal from that process. (multiprocessing with shared variables and data structures)

It needs to be the most up to date data it can be when it transfers, there cannot be pending updates from main_tasks.

这就是为什么我只想在没有main_tasks包含新数据可处理的情况下才运行special_function的原因。(即所有等待中await

VPf*_*PfB 6

我试图为“任务未准备好运行”条件编写测试。我认为asyncio不会公开调度程序的详细信息。开发人员明确表示,他们希望保留更改异步内部结构的自由,而又不会破坏向后兼容性。

asyncio.Task其中有此注释(注意:_step()运行任务协程直到下一次等待):

# An important invariant maintained while a Task not done:
#   
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.
Run Code Online (Sandbox Code Playgroud)

但是,该内部变量当然不在API中。

您可以_fut_waiter通过阅读的输出来获得有限的访问权限repr(task),但是格式似乎也不可靠,因此我不会依赖于以下内容:

PENDINGMSG = 'wait_for=<Future pending '

if all(PENDINGMSG in repr(t) for t in monitored_tasks):
    do_something()
Run Code Online (Sandbox Code Playgroud)

无论如何,我认为您正在尝试变得过于完美。您想知道其他任务中是否有新数据。如果数据在异步缓冲区中怎么办?内核缓冲区?网卡接收缓冲区?...您永远无法知道新数据是否会在下一毫秒到达。

我的建议:将所有更新写入单个队列。检查该队列作为唯一的更新源。如果队列为空,则发布最后一个状态。