使用 AIOfile 读取异步文件

Ale*_*oVK 4 python asynchronous python-asyncio

我正在尝试使用 asyncio 读取多个文件(CSV),但我不想在执行此操作时阻止主事件循环。

所以我检查了 AIOfile,它似乎承诺读取不会阻塞。虽然这可能是真的,但以下代码片段需要花费大量时间才能完成,但它与此处的示例基本相同https://github.com/mosquito/aiofile#read-file-line-by-line

import asyncio
from aiofile import AIOFile, LineReader
from pathlib import Path
import time

counter = 0

async def main():
    path = 'test_data'
    global counter
    data_dir = Path(path)
    files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
    list_of_files = [(path + '/' + file.name, file) for file in files_in_basepath]
    for file in list_of_files:
        line_count = 0
        async with AIOFile(file[0]) as afp:
            await afp.fsync()
            async for line in LineReader(afp):
                #print(line)
                values = ''
                line_values = line.split(',')
                for item in line_values:
                    values = values + item + ' '
                # print(values)
                line_count += 1
        print(f'Processed {line_count} lines in file {file[1].name}.')
        counter += 1

start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
duration = time.time() - start_time
print(f"Processed {counter} data files in {duration} seconds")
Run Code Online (Sandbox Code Playgroud)

这会带来糟糕的性能,100 个文件需要:

在 196.8809883594513 秒内处理了 100 个数据文件

与这些文件的顺序处理相比,这真是令人难以置信......

在 0.9933180809020996 秒内处理 100 个数据文件

所以我想知道这里发生了什么,而且我在几个地方看到建议在执行器中运行 IO 操作,这样事件循环就不会被阻塞。

顺便提一下,我还有一些其他代码可以在线程上运行它,并且执行效果几乎与顺序执行一样好:

import concurrent.futures
import csv
import threading
import time
from pathlib import Path

c_lock = threading.Lock()
counter = 0

def read_data_file(files):
    # Get the info from second item from tuple
    info = files[1].stat()
    global c_lock
    global counter
    c_lock.acquire()
    print(info.st_mtime)
    print(f'File name is {files[1].name} with size {round(info.st_size / float(1 << 10), 2)} KB')
    with open(files[0]) as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        line_count = 0
        for row in csv_reader:
            # Just assume we do something very interesting with these values...
            values = ''
            for item in row:
                values = values + item + ' '
            #print(values)
            line_count += 1
        print(f'Processed {line_count} lines in file {files[1].name}.')
    counter += 1
    c_lock.release()

def read_data_files(path):
    # List all files in data folder
    data_dir = Path(path)
    files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
    list_of_files = []
    for file in files_in_basepath:
        list_of_files.append((path + '/' + file.name, file))
    with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
        executor.map(read_data_file, list_of_files)


if __name__ == "__main__":
    data_files = 'test_data'
    start_time = time.time()
    read_data_files(data_files)
    duration = time.time() - start_time
    print(f"Processed {counter} data files in {duration} seconds")
Run Code Online (Sandbox Code Playgroud)

这给出了以下内容:

在 1.0079402923583984 秒内处理 100 个数据文件

想知道我是否在使用 asyncio 时做错了什么,或者我应该完全跳过它...我只是在尝试什么是处理所有这些文件的最有效方法,顺序、线程(包括 asyncio)或多处理)

use*_*342 6

您的多线程代码read_data_file使用一个巨大的锁锁定所有内容,强制其按顺序执行,导致线程版本的性能并不比顺序版本好。

asyncio 版本也按顺序运行,因为代码未使用asyncio.gather或类似于并行化它。至于为什么它比常规顺序版本慢 200 倍,这可能是一个向 aiofiles 开发人员询问的好问题。我怀疑每个行读取操作都单独传递给内部线程,由于如此热循环中的大量簿记而减慢了速度。

总之:

  • 如果你的瓶颈是 IO 速度,那么只要注意不要因不必要的锁定而使事情按顺序进行,你可能会通过使用多个线程获得一些好处。(GIL 不会成为问题,因为它会在 IO 操作周围自动释放。)

  • 如果你的瓶颈是 CPU 的速度,你可能想要研究多处理,因为由于 GIL,多线程不会有帮助。例如,在读取 CSV 文件时,解析文件内容并将其转换为数字所需的时间可能会使从磁盘读取文件所需的时间相形见绌,特别是当文件被操作系统缓存在内存中时。

  • asyncio 和 aiofiles 很可能不会帮助您提高处理 CSV 文件的速度。aiofiles 在集成读取可能“卡住”的文件时最有用(例如,因为它们可能正在从不再存在的网络驱动器读取)。在当前的实现中,它对于读取需要高吞吐量的文件没有用。

TL;DR 尝试正确使用线程来提高速度,如果这不起作用,请使用多处理。