如何使用AirFlow运行python任务列表?

Ten*_*sor 21 python airflow

我在一系列python文件中有一系列Python任务:file1.py,.....放在一个文件夹里面.

我阅读了Airflow的文档,但没看到如何在DAG中指定python文件的文件夹和文件名?

我想执行那些python文件(不是Python函数的Python函数).

Task1:执行file1.py(带一些导入包)Task2:执行file2.py(带一些其他导入包)

这会有所帮助.感谢和问候

Rom*_*man 16

要整体执行python文件,请使用BashOperator(在liferacer的答案中):

from airflow.operators.bash_operator import BashOperator

bash_task = BashOperator(
    task_id='bash_task',
    bash_command='python file1.py',
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

然后,使用PythonOperator调用您的main函数来完成它.你应该已经有了一个__main__块,所以把那里发生的事情放到一个main函数中,这样file1.py看起来像这样:

def main():
    """This gets executed if `python file1` gets called."""
    # my code

if __name__ == '__main__':
    main() 
Run Code Online (Sandbox Code Playgroud)

然后你的dag定义:

from airflow.operators.python_operator import PythonOperator

import file1

python_task = PythonOperator(
    task_id='python_task',
    python_callable=file1.main,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)


dla*_*lin 14

我知道你要求你"想要执行那些python文件(而不是Python操作员的Python函数)." 但我认为这可能比使用Airflow更有效.我也看到了之前写的答案中的混乱,所以这是你想要的方式,以及我建议的方式来完成任务:

假设:

dags/
    my_dag_for_task_1_and_2.py
    tasks/
         file1.py
         file2.py
Run Code Online (Sandbox Code Playgroud)

您的要求避免PythonOperator:

#  my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import BashOperator

with DAG(
    'my_dag_for_task_1_and_2',
    default_args={
        'owner': 'me',
        'start_date': datetime(…),
        …,
    }, 
    schedule_interval='8 * * * *',
) as dag:
    task_1 = BashOperator(
        task_id='task_1', 
        bash_command='/path/to/python /path/to/dags/tasks/file1.py',
    )
    task_2 = BashOperator(
        task_id='task_2', 
        bash_command='/path/to/python /path/to/dags/tasks/file2.py',
    )
    task_1 >> task_2
Run Code Online (Sandbox Code Playgroud)

您没有从头开始为Airflow编写Python,但是PythonOperator:

#  my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import PythonOperator
import tasks.file1
import tasks.file2

with DAG(
    'my_dag_for_task_1_and_2',
    default_args={
        'owner': 'me',
        'start_date': datetime(…),
        …,
    }, 
    schedule_interval='8 * * * *',
) as dag:
    task_1 = PythonOperator(
        task_id='task_1', 
        python_callable=file1.function_in_file1,
    )
    task_2 = PythonOperator(
        task_id='task_2', 
        python_callable=file2.function_in_file2,  # maybe main?
    )
    task_1 >> task_2
Run Code Online (Sandbox Code Playgroud)

  • 我喜欢您使用带有DAG(...)的dag:...`。改善环境。 (2认同)
  • @Wordsmyth社区正在努力更新所有示例,以显示在DAG和任务依赖项中指定任务的所有不同方式的混合.所以这很快就会成为一个不那么"隐藏"的功能. (2认同)
  • @AshishKumar,您可能需要将 `__init__.py` 文件放入脚本文件夹中。它可以是空的。参考https://docs.python.org/3/tutorial/modules.html (2认同)

lif*_*cer 10

您可以使用BashOperator将python文件作为任务执行

    from airflow import DAG
    from airflow.operators import BashOperator,PythonOperator
    from datetime import datetime, timedelta

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                      datetime.min.time())

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': seven_days_ago,
        'email': ['airflow@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
      )

    dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
    task_id='testairflow',
    bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

  • 为什么要在这里导入PythonOperator?你似乎没有使用它. (8认同)