将 Python 文件转换为 Airflow DAG

jac*_*ack 0 python airflow

我有这个 Python 文件:

class Get:

    def __init__(self, i):
        self.i = get_date(i)
        self.df = self.get_file()

    def get_file(self):
        try:
            ...
            return df
        except Exception as e:
            return ...

    def get_date(self,i):
        dt = datetime.now() - timedelta(days=i)
        return dt.strftime("%Y-%m-%d")

    def put(self,df):
        ....


class Fix:
    def __init__(self,df):
        ....

if __name__ == '__main__':
    for i in range(4, 0, -1):
        get = Get(i)
        fix = Fix(get.df)
        get.put(fix.df)
Run Code Online (Sandbox Code Playgroud)

基本上这段代码生成 4 个最后的日期并在这些日期上运行函数(更新统计信息等...)

起初我想将每个函数转换为 PythonOperator 然后安排它,但我认为这行不通。我不知道如何转换 Classes 和它们之间传输的参数。

如果我在 2018 年 6 月 12 日及以下运行它,代码会执行以下操作: 在此处输入图片说明

是否有我可以使用的模板或任何建议?

小智 5

您可以使用 BashOperator 执行脚本,而无需对脚本进行任何更改:

dag = DAG('{NAME_OF_THE_DAG}', schedule_interval='daily', 
default_args=default_args)

t1 = BashOperator(
    task_id = '{NAME_OF_TASK}',
    dag = dag,
    bash_command = python {NAME_OF_THE_FILE_TO_EXECUTE}.py')
Run Code Online (Sandbox Code Playgroud)

或使用 PythonOperator:

  1. 更新您的代码以在脚本中创建main函数:

    def main():
        for i in range(4, 0, -1):
        get = Get(i)
        fix = Fix(get.df)
        get.put(fix.df)
    
    Run Code Online (Sandbox Code Playgroud)
  2. 定义并执行 dag:

    dag = DAG('{NAME_OF_THE_TASK}', schedule_interval = 'daily', 
    default_args=default_args)
    
    t1 = PythonOperator(
        task_id = '{NAME_OF_TASK}',
        dag = dag,
        python_callable = main)
    
    Run Code Online (Sandbox Code Playgroud)