如何读取一个目录中的多个文件,所有这些文件都是带有Airflow S3 Hook或boto3的csv.gzip?

Sul*_*ing 1 python csv amazon-s3 boto3 airflow

我在 S3 中有一个目录,假设s3://test-bucket/test-folder/2020-08-28/其中有这样的文件:

2020-08-28 03:29:13   29397684 data_0_0_0.csv.gz
2020-08-28 03:29:13   29000150 data_0_1_0.csv.gz
2020-08-28 03:29:13   38999956 data_0_2_0.csv.gz
2020-08-28 03:29:13   32079942 data_0_3_0.csv.gz
2020-08-28 03:29:13   34154791 data_0_4_0.csv.gz
2020-08-28 03:29:13   45348128 data_0_5_0.csv.gz
2020-08-28 03:29:13   60904419 data_0_6_0.csv.gz
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用 S3 钩子 ( https://airflow.readthedocs.io/en/stable/_modules/airflow/hooks/S3_hook.html )创建 Airflow 运算符,它将在某处转储这些文件的内容。我试过:

contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/),
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv)
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv.gz)
Run Code Online (Sandbox Code Playgroud)

这些似乎都不起作用。我注意到有s3.select_key但似乎没有正确的参数,只有输入和输出序列化。有什么方法可以使用 S3 hook 导入这些数据而不对文件本身做任何事情?

我的下一个问题是文件夹中有一堆文件s3://test-bucket/test-folder/2020-08-28/。我尝试使用,list_keys但它不喜欢存储桶名称:

keys = s3.list_keys('s3://test-bucket/test-folder/2020-08-28/')
Run Code Online (Sandbox Code Playgroud)

Invalid bucket name "s3://test-bucket/test-folder/2020-08-28/": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$"
Run Code Online (Sandbox Code Playgroud)

我也尝试过同样的事情,但删除了“s3://”。它在任何时候都不会给我一个身份验证错误。当我输入上面.csv.gzread_key电话时,它告诉我

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
Run Code Online (Sandbox Code Playgroud)

我假设这与它被 gzip 压缩的事实有关?

那么我如何才能 1. 从 S3 中读取压缩 csv 文件的密钥,以及 2. 如何一次读取给定目录中的所有 csv 文件?

Mig*_*ejo 6

假设您正在从s3://your_bucket/your_directory/YEAR-MONTH-DAY/. 那么你可以做两件事:

  • 读取数据路径。读取.csv.gz每个子目录中文件的路径

  • 加载数据。在此示例中,我们将它们加载为pandas.DataFrame,但您也可以将其保留为 gzip 对象。

1.A 使用 Airflow S3 Hook 读取路径

# Initialize the s3 hook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_hook = S3Hook()

# Read the keys from s3 bucket
paths = s3_hook.list_keys(bucket_name='your_bucket_name', prefix='your_directory')
Run Code Online (Sandbox Code Playgroud)

其中,列出它在后面使用分页器的键。这就是我们在路径列表中读取的第三种形式。

1.B 用分页器读取路径

例如,在分页器的情况下,如果您想列出s3_//your_bucket/your_directory/item.csv.gz, ... 等中的对象。然后,分页器将像(示例取自docs)一样工作

client = boto3.client('s3', region_name='us-west-2')
paginator = client.get_paginator('list_objects')
operation_parameters = {'Bucket': 'your_bucket',
                        'Prefix': 'your_directory'}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
    print(page['Contents'])
Run Code Online (Sandbox Code Playgroud)

这将输出一个字典列表,您可以从中过滤Key每个字典的 以获取要读取的路径列表,即分页器将抛出类似

[{'Key': 'your_directoyr/file_1.csv.gz
....},
..., 
{'Key': 'your_directoyr/file_n.csv.gz
....}
Run Code Online (Sandbox Code Playgroud)

现在我们使用第三种形式来做到这一点,它与之前的形式相似。

1.C 使用 Boto 3 客户端读取路径

要读取路径,请考虑以下函数

import boto3 

s3_client = boto3.client('s3')

def get_all_s3_objects(s3_client, **base_kwargs):
    continuation_token = None
    while True:
        list_kwargs = dict(MaxKeys=1000, **base_kwargs)
        if continuation_token:
            list_kwargs['ContinuationToken'] = continuation_token
        response = s3_client.list_objects_v2(**list_kwargs)
        yield from response.get('Contents', [])
        if not response.get('IsTruncated'):  # At the end of the list?
            break
        continuation_token = response.get('NextContinuationToken')
Run Code Online (Sandbox Code Playgroud)

例如,当您使用后缀 Key 和您的存储桶名称调用此函数时

files = get_all_s3_objects(s3_client, Bucket='your_bucket_name', Prefix=f'your_directory/YEAR-MONTH-DAY')
paths = [f['Key'] for f in files]
Run Code Online (Sandbox Code Playgroud)

通过调用路径,您将获得一个包含.csv.gz文件的列表。在你的情况下,这将是

[data_0_0_0.csv.gz,
data_0_1_0.csv.gz,
data_0_2_0.csv.gz]
Run Code Online (Sandbox Code Playgroud)

然后,您可以将其作为以下函数的输入,以将您的数据读取为 Pandas 数据帧,例如。

2.加载数据

考虑函数

from io import BytesIO
import pandas as pd

def load_csv_gzip(s3_client, bucket, key):
    with BytesIO() as f:
        s3_files = s3_client.download_fileobj(Bucket=bucket,
                           Key=key,
                           Fileobj=f)
        f.seek(0)
        gzip_fd = gzip.GzipFile(fileobj=f)
        return pd.read_csv(gzip_fd)
Run Code Online (Sandbox Code Playgroud)

最后,您将提供一个包含.csv.gz文件的列表,您可以迭代加载每个路径并将结果连接到一个 Pandas 数据帧,或者您可以只加载一个.csv.gz文件。例如,

data = pd.concat([load_csv_gzip(s3_client, 'your_bucket', path) for p in paths])
Run Code Online (Sandbox Code Playgroud)

其中路径的每个元素都类似于your_subdirectory/2020-08-28/your_file.csv.gz.