使用asyncio逐行读取文件

jos*_*inb 20 python python-asyncio

我希望在编写时读取几个日志文件并使用asyncio处理它们的输入.代码必须在Windows上运行.根据我从stackoverflow和web搜索的理解,异步文件I/O在大多数操作系统上都很棘手(select例如,不会按预期工作).虽然我确信我可以用其他方法(例如线程)做到这一点,但我会尝试使用asyncio来查看它是什么样的.最有用的答案可能是描述这个问题的解决方案的"架构"应该是什么样的,即应该如何调用或调度不同的函数和协程.

下面给我一个生成器,逐行读取文件(通过轮询,这是可以接受的):

import time

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            time.sleep(POLL_INTERVAL)
            continue
        process_line(line)
Run Code Online (Sandbox Code Playgroud)

有几个要监视和处理的文件,这种代码需要线程.我稍微修改了它以便更适用于asyncio:

import asyncio

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            yield from asyncio.sleep(POLL_INTERVAL)
            continue
        process_line(line)
Run Code Online (Sandbox Code Playgroud)

当我通过asyncio事件循环安排它时,这种方法有效,但如果是process_data块,那么这当然不好.在开始时,我想象解决方案看起来像

def process_data():
    ...
    while True:
        ...
        line = yield from line_reader()
        ...
Run Code Online (Sandbox Code Playgroud)

但我无法弄清楚如何使这项工作(至少没有process_data管理相当多的状态).

关于如何构建这种代码的任何想法?

pyl*_*ver 25

使用aiofiles:

async with aiofiles.open('filename', mode='r') as f:
    async for line in f:
        print(line)
Run Code Online (Sandbox Code Playgroud)

编辑1

正如@Jashandeep所提到的,你应该关心阻塞操作:

另一种方法是select和或epoll:

from select import select

files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1)
Run Code Online (Sandbox Code Playgroud)

这里的timeout参数很重要.

请参阅:https://docs.python.org/3/library/select.html#select.select

编辑2

您可以使用以下命令注册文件以进行读/写:loop.add_reader()

它在循环内部使用内部EPOLL处理程序.

编辑3

但请记住,Epoll不适用于常规文件.


Jas*_*ohi 16

根据我在stackoverflow和web上搜索的理解,异步文件I/O在大多数操作系统上都很棘手(例如,select不会按预期工作).虽然我确信我可以用其他方法(例如线程)做到这一点,但我会尝试使用asyncio来查看它是什么样的.

asyncio select基于*nix系统,因此如果不使用线程,您将无法进行非阻塞文件I/O. 在Windows上,asyncio可以使用IOCP,它支持非阻塞文件I/O,但不支持此操作asyncio.

你的代码很好,除了你应该在线程中阻止I/O调用,这样你就不会在I/O很慢时阻塞事件循环.幸运的是,使用该loop.run_in_executor函数将工作卸载到线程非常简单.

首先,为您的I/O设置专用线程池:

from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()
Run Code Online (Sandbox Code Playgroud)

然后只需卸载对执行程序的任何阻塞I/O调用:

...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...
Run Code Online (Sandbox Code Playgroud)