Airflow s3Hook - 使用 pandas read_csv 读取 s3 中的文件

Kri*_*una 5 python amazon-s3 pandas airflow

我正在尝试使用 pandas 读取一些文件来s3Hook获取密钥。我能够获取密钥,但是我不确定如何让 pandas 找到文件,当我运行以下命令时,我得到:

没有这样的文件或目录:

这是我的代码:

def transform_pages(company, **context):
    ds = context.get("execution_date").strftime('%Y-%m-%d')

    s3 = S3Hook('aws_default')
    s3_conn = s3.get_conn()
    keys = s3.list_keys(bucket_name=Variable.get('s3_bucket'),
                        prefix=f'S/{company}/pages/date={ds}/',
                        delimiter="/")

    prefix = f'S/{company}/pages/date={ds}/'
    logging.info(f'keys from function: {keys}')

    """ transforming pages and loading data back to S3 """
    for file in keys:
        df = pd.read_csv(file, sep='\t', skiprows=1, header=None)
Run Code Online (Sandbox Code Playgroud)

Fel*_*nza 2

您正在寻找的格式如下:

filepath = f"s3://{bucket_name}/{key}"
Run Code Online (Sandbox Code Playgroud)

因此,在您的具体情况下,类似:

for file in keys:
    filepath = f"s3://s3_bucket/{file}"
    df = pd.read_csv(filepath, sep='\t', skiprows=1, header=None)
Run Code Online (Sandbox Code Playgroud)

只要确保您已s3fs安装即可(pip install s3fs)。