带有 Inplace 的 Asyncio Pandas

Ton*_*ony 4 python python-3.x pandas python-asyncio

我刚刚阅读了这个介绍,但是在实现其中一个示例时遇到了麻烦(注释代码是第二个示例):

import asyncio
import pandas as pd
from openpyxl import load_workbook

async def loop_dfs(dfs):
    async def clean_df(df):
        df.drop(["column_1"], axis=1, inplace=True)
        ... a bunch of other inplace=True functions ...
        return "Done"

    # tasks = [clean_df(df) for (table, dfs) in dfs.items()]
    # await asyncio.gather(*tasks)

    tasks = [clean_df(df) for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)


def main():
    dfs = {
        sn: pd.read_excel("excel.xlsx", sheet_name=sn)
        for sn in load_workbook("excel.xlsx").sheetnames
    }

    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(loop_dfs(dfs))

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(loop_dfs(dfs))
    finally:
        loop.close()

main()
Run Code Online (Sandbox Code Playgroud)

我看到了一些关于 Pandas 如何不支持 asyncio 的其他帖子,也许我只是缺少一个更大的图景,但是如果我在进行就地操作,那应该没关系吧? 我看到了对 Dask 的建议,但没有立即支持阅读 excel,我想我会先尝试这个,但我一直在得到

RuntimeError: Event loop already running

use*_*342 9

我看到了一些关于 Pandas 如何不支持 asyncio 的其他帖子,也许我只是缺少一个更大的图景,但是如果我在进行就地操作,那应该没关系吧?

就地操作是那些修改现有数据的操作。这是一个效率问题,而您的目标似乎是并行化,这是一个完全不同的问题。

Pandas 不支持 asyncio,不仅因为这还没有实现,而且因为 Pandas 通常不执行 asyncio 支持的那种操作:网络和子进程 IO。Pandas 函数要么使用 CPU,要么等待磁盘访问,这两种方法都不适合 asyncio。Asyncio 允许使用看起来像普通同步代码的协程来表达网络通信。在协程内部,每个阻塞操作(例如网络读取)都被awaited,如果数据尚不可用,它会自动挂起整个任务。在每次这样的暂停时,系统会切换到下一个任务,从而有效地创建一个协作的多任务系统。

当尝试调用不支持 asyncio 的库(例如 pandas)时,表面上看起来可以工作,但您不会获得任何好处,并且代码将串行运行。例如:

async def loop_dfs(dfs):
    async def clean_df(df):
        ...    
    tasks = [clean_df(df) for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)
Run Code Online (Sandbox Code Playgroud)

由于clean_df不包含 的单个实例await,它只是名义上的协程 - 它永远不会真正暂停其执行以允许其他协程运行。因此await asyncio.wait(tasks)将按顺序运行任务,就像你写的一样:

for table, df in dfs.items():
    clean_df(df)
Run Code Online (Sandbox Code Playgroud)

为了让事情并行运行(假设Pandas在其操作期间偶尔会释放GIL),您应该将单个 CPU 绑定函数移交给线程池:

async def loop_dfs(dfs):
    def clean_df(df):  # note: ordinary def
        ...
    loop = asyncio.get_event_loop(0
    tasks = [loop.run_in_executor(clean_df, df)
             for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)
Run Code Online (Sandbox Code Playgroud)

如果你走那条路,你首先不需要 asyncio,你可以简单地使用concurrent.futures. 例如:

def loop_dfs(dfs):  # note: ordinary def
    def clean_df(df):
        ...
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(clean_df, df)
                   for (table, df) in dfs.items()]
        concurrent.futures.wait(futures)
Run Code Online (Sandbox Code Playgroud)

我想我会先试试这个,但我一直在 RuntimeError: Event loop already running

该错误通常意味着您在已经为您运行 asyncio 的环境中启动了脚本,例如 jupyter notebook。如果是这种情况,请确保使用 stock 运行脚本python,或查阅笔记本的文档如何更改代码以将协程提交到已运行的事件循环。

  • @EvanCarroll 好点,我现在修改了答案,不要将重点放在“受 CPU 限制”上。剩下的点仍然存在 - 当 Pandas 需要等待 IO 时,它正在执行 asyncio 不支持 ** 的磁盘 IO **(至少有两个外部库可以这样做,都使用线程)。我不是 Pandas 开发人员,但我怀疑这就是本机支持 asyncio 没有多大意义的原因。 (2认同)