并行读取文件并参数化类参数

Era*_*she 2 python multithreading python-asyncio

假设我有一个类,并且想从磁盘并行读取几个文件,并对类参数进行参数化。什么是最正确的方法(以及如何)?

  • 主线程应等待 load_data() 操作结束,然后再发生其他任何事情。

我考虑过线程,因为它只是 I/O 操作。

非并行实现示例(1-Threading):

import pandas as pd


class DataManager(object):
    def __init__(self):
        self.a = None
        self.b = None
        self.c = None
        self.d = None
        self.e = None
        self.f = None

    def load_data(self):
        self.a = pd.read_csv('a.csv')
        self.b = pd.read_csv('b.csv')
        self.c = pd.read_csv('c.csv')
        self.d = pd.read_csv('d.csv')
        self.e = pd.read_csv('e.csv')
        self.f = pd.read_csv('f.csv')

if __name__ == '__main__':
    dm = DataManager()
    dm.load_data()
    # Main thread is waiting for load_data to finish.
    print("finished loading data")
Run Code Online (Sandbox Code Playgroud)

Ale*_*nko 5

在大多数情况下,I/O 操作不受 CPU 限制,因此使用多个进程是一种矫枉过正。使用多线程可能很好,但pb.read_csv不仅可以读取文件,还可以解析 CPU 受限的内容。我建议您在最初为此目的而制作的 asyncio 中从磁盘读取文件。这是执行此操作的代码:

import asyncio
import aiofiles


async def read_file(file_name):
    async with aiofiles.open(file_name, mode='rb') as f:
        return await f.read()


def read_files_async(file_names: list) -> list:
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(
        asyncio.gather(*[read_file(file_name) for file_name in file_names]))


if __name__ == '__main__':
    contents = read_files_async([f'files/file_{i}.csv' for i in range(10)])
    print(contents)
Run Code Online (Sandbox Code Playgroud)

该函数read_files_async返回文件内容列表(字节缓冲区),您可以将其传递给pd.read_csv.

我认为优化文件读取应该就足够了,但是您可以与多个进程并行解析文件内容(线程和异步不会提高解析过程的性能):

import multiprocessing as mp

NUMBER_OF_CORES = 4
pool = mp.Pool(NUMBER_OF_CORES)
pool.map(pb.read_csv, contents)
Run Code Online (Sandbox Code Playgroud)

您应该NUMBER_OF_CORES根据您的机器规格进行设置。