Sul*_*ing 1 python csv amazon-s3 boto3 airflow
我在 S3 中有一个目录,假设s3://test-bucket/test-folder/2020-08-28/
其中有这样的文件:
2020-08-28 03:29:13 29397684 data_0_0_0.csv.gz
2020-08-28 03:29:13 29000150 data_0_1_0.csv.gz
2020-08-28 03:29:13 38999956 data_0_2_0.csv.gz
2020-08-28 03:29:13 32079942 data_0_3_0.csv.gz
2020-08-28 03:29:13 34154791 data_0_4_0.csv.gz
2020-08-28 03:29:13 45348128 data_0_5_0.csv.gz
2020-08-28 03:29:13 60904419 data_0_6_0.csv.gz
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用 S3 钩子 ( https://airflow.readthedocs.io/en/stable/_modules/airflow/hooks/S3_hook.html )创建 Airflow 运算符,它将在某处转储这些文件的内容。我试过:
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/),
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv)
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv.gz)
Run Code Online (Sandbox Code Playgroud)
这些似乎都不起作用。我注意到有s3.select_key
但似乎没有正确的参数,只有输入和输出序列化。有什么方法可以使用 S3 hook 导入这些数据而不对文件本身做任何事情?
我的下一个问题是文件夹中有一堆文件s3://test-bucket/test-folder/2020-08-28/
。我尝试使用,list_keys
但它不喜欢存储桶名称:
keys = s3.list_keys('s3://test-bucket/test-folder/2020-08-28/')
Run Code Online (Sandbox Code Playgroud)
给
Invalid bucket name "s3://test-bucket/test-folder/2020-08-28/": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$"
Run Code Online (Sandbox Code Playgroud)
我也尝试过同样的事情,但删除了“s3://”。它在任何时候都不会给我一个身份验证错误。当我输入上面.csv.gz
的read_key
电话时,它告诉我
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
Run Code Online (Sandbox Code Playgroud)
我假设这与它被 gzip 压缩的事实有关?
那么我如何才能 1. 从 S3 中读取压缩 csv 文件的密钥,以及 2. 如何一次读取给定目录中的所有 csv 文件?
假设您正在从s3://your_bucket/your_directory/YEAR-MONTH-DAY/
. 那么你可以做两件事:
读取数据路径。读取.csv.gz
每个子目录中文件的路径
加载数据。在此示例中,我们将它们加载为pandas.DataFrame
,但您也可以将其保留为 gzip 对象。
# Initialize the s3 hook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_hook = S3Hook()
# Read the keys from s3 bucket
paths = s3_hook.list_keys(bucket_name='your_bucket_name', prefix='your_directory')
Run Code Online (Sandbox Code Playgroud)
其中,列出它在后面使用分页器的键。这就是我们在路径列表中读取的第三种形式。
例如,在分页器的情况下,如果您想列出s3_//your_bucket/your_directory/item.csv.gz
, ... 等中的对象。然后,分页器将像(示例取自docs)一样工作
client = boto3.client('s3', region_name='us-west-2')
paginator = client.get_paginator('list_objects')
operation_parameters = {'Bucket': 'your_bucket',
'Prefix': 'your_directory'}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
print(page['Contents'])
Run Code Online (Sandbox Code Playgroud)
这将输出一个字典列表,您可以从中过滤Key
每个字典的 以获取要读取的路径列表,即分页器将抛出类似
[{'Key': 'your_directoyr/file_1.csv.gz
....},
...,
{'Key': 'your_directoyr/file_n.csv.gz
....}
Run Code Online (Sandbox Code Playgroud)
现在我们使用第三种形式来做到这一点,它与之前的形式相似。
要读取路径,请考虑以下函数
import boto3
s3_client = boto3.client('s3')
def get_all_s3_objects(s3_client, **base_kwargs):
continuation_token = None
while True:
list_kwargs = dict(MaxKeys=1000, **base_kwargs)
if continuation_token:
list_kwargs['ContinuationToken'] = continuation_token
response = s3_client.list_objects_v2(**list_kwargs)
yield from response.get('Contents', [])
if not response.get('IsTruncated'): # At the end of the list?
break
continuation_token = response.get('NextContinuationToken')
Run Code Online (Sandbox Code Playgroud)
例如,当您使用后缀 Key 和您的存储桶名称调用此函数时
files = get_all_s3_objects(s3_client, Bucket='your_bucket_name', Prefix=f'your_directory/YEAR-MONTH-DAY')
paths = [f['Key'] for f in files]
Run Code Online (Sandbox Code Playgroud)
通过调用路径,您将获得一个包含.csv.gz
文件的列表。在你的情况下,这将是
[data_0_0_0.csv.gz,
data_0_1_0.csv.gz,
data_0_2_0.csv.gz]
Run Code Online (Sandbox Code Playgroud)
然后,您可以将其作为以下函数的输入,以将您的数据读取为 Pandas 数据帧,例如。
考虑函数
from io import BytesIO
import pandas as pd
def load_csv_gzip(s3_client, bucket, key):
with BytesIO() as f:
s3_files = s3_client.download_fileobj(Bucket=bucket,
Key=key,
Fileobj=f)
f.seek(0)
gzip_fd = gzip.GzipFile(fileobj=f)
return pd.read_csv(gzip_fd)
Run Code Online (Sandbox Code Playgroud)
最后,您将提供一个包含.csv.gz
文件的列表,您可以迭代加载每个路径并将结果连接到一个 Pandas 数据帧,或者您可以只加载一个.csv.gz
文件。例如,
data = pd.concat([load_csv_gzip(s3_client, 'your_bucket', path) for p in paths])
Run Code Online (Sandbox Code Playgroud)
其中路径的每个元素都类似于your_subdirectory/2020-08-28/your_file.csv.gz
.