如何在 Airflow-MacOS 中写入本地文件路径

Cod*_*123 5 operating-system python-3.x boto3 airflow airflow-scheduler

我正在编写一个 Airflow 管道,其中涉及将结果写入本地文件系统上的 csv 文件。

我使用的是 MacOS,文件路径类似于 /User/name/file_path/file_name.csv)

这是我的代码:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.models import Variable
import os
from airflow.operators.python_operator import PythonOperator
#Import boto3 module
import boto3
import logging
from botocore.exceptions import ClientError
import csv
import numpy as np
import pandas as pd

bucket='my_bucket_name'

s3 = boto3.resource('s3',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY
    )


def load_into_csv(years):
    df = pd.DataFrame()
    for year in years:
        for buckett in s3.buckets.all():
            for aobj in buckett.objects.filter(Bucket=bucket,Prefix=PREFIX):
                if year in aobj.key:
                    bucket_name= "'{}'  ".format(buckett.name)
                    the_key= "'{}'  ".format(aobj.key)
                    last_mod= "'{}'  ".format(aobj.last_modified)
                    stor_class= "'{}'  ".format(aobj.storage_class)
                    size_1= "'{}'  ".format(aobj.size)
                    dd = {'bucket_name':[bucket_name], 'S3_key_path':[the_key], 'last_modified_date':[last_mod], 'storage_class':[stor_class], 'size':[size_1] }
                    df_2 = pd.DataFrame(data=dd)
                    df = df.append(df_2, ignore_index=True)

                    #Get local directory 
                    path=os.getcwd()

                    export_csv = df.to_csv (r'{}/results.csv'.format(path) ,index = None, header=True)


load_into_csv(years)


#######################################################################################################################

default_args = {
    'owner': 'name',
    'depends_on_past': False,
    'start_date': datetime(2020,1,1),
    'email': ['email@aol.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}


dag = DAG('bo_v1',
          description = 'this is a test script',
          default_args=default_args,
          schedule_interval= '@once',
          catchup = False )


years=['2017','2018','2019']

for year in years:
    t1 = PythonOperator(
        task_id='load 2017',
        python_callable= load_into_csv,
        provide_context=False,
        dag = dag)
Run Code Online (Sandbox Code Playgroud)

如果您查看路径变量,我尝试收集本地操作系统路径,然后将其设置为导出 csv 变量中的输出文件,但无济于事。

有没有办法将本地MacOS文件路径(/Users/name/path/file_name.csv)设置为export_to_csv变量中的文件路径?我是 Airflow 的新手,所以任何想法或建议都会有帮助!

小智 0

我已经在我的 Mac 上尝试了路径动态方式,就像您对path=os.getcwd(). 我将它放在任务或全局命名空间中,但它没有返回任何可行的路径。解决此问题的一种方法是将路径作为变量放入 Airflow 变量中,然后在需要时从那里获取它。