将命令行参数传递给气流BashOperator

Shi*_*iva 3 python bash workflow airflow

有没有一种方法可以将命令行参数传递给Airflow BashOperator。目前,我有一个python脚本,它接受date参数并执行一些特定的活动,例如清除早于给定日期的特定文件夹。

在仅执行一项任务的简化代码中,我想做的是

from __future__ import print_function
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

default_args = {
    'owner'             : 'airflow'
    ,'depends_on_past'  : False
    ,'start_date'       : datetime(2017, 01, 18)
    ,'email'            : ['abc@xyz.com']
    ,'retries'          : 1
    ,'retry_delay'      : timedelta(minutes=5)
}

dag = DAG(
    dag_id='data_dir_cleanup'
    ,default_args=default_args
    ,schedule_interval='0 13 * * *'
    ,dagrun_timeout=timedelta(minutes=10)
    )

cleanup_task = BashOperator(
        task_id='task_1_data_file_cleanup'
        ,bash_command='python cleanup.py --date $DATE 2>&1 >>  /tmp/airflow/data_dir_cleanup.log'
        #--------------------------------------^^^^^^-- (DATE variable which would have been given on command line)
        #,env=env
        ,dag=dag
    )
Run Code Online (Sandbox Code Playgroud)

提前致谢,

Bol*_*uin 6

BashOperator以Jinja2为模板,这意味着您可以传递任意值。在您的情况下,它将类似于:

cleanup_task = BashOperator(
        task_id='task_1_data_file_cleanup'
        ,bash_command="python cleanup.py --date {{ DATE }} 2>&1 >>  /tmp/airflow/data_dir_cleanup.log"
        ,params = {'DATE' : 'this-should-be-a-date'}
        ,dag=dag
    )
Run Code Online (Sandbox Code Playgroud)

另请参见:https : //airflow.incubator.apache.org/tutorial.html#templating-with-jinja以获得更广泛的示例。

  • 我建议使用`{{params.DATE}}`而不是`{{DATE}}`来说明它的来源。甚至更多,我会使用小写的{{params.date}},因为它不是常量。 (5认同)

小智 5

您可以尝试以下方法(对我有用):

cmd_command = "python path_to_task/[task_name.py] '{{ execution_date }}' '{{ prev_execution_date }}'"

t = BashOperator(
     task_id = 'some_id',
     bash_command = cmd_command,
     dag = your_dag_object_name)
Run Code Online (Sandbox Code Playgroud)

当我这样做时,它呈现了变量,并且效果很好。我相信它适用于所有变量(请注意,我在命令开头添加了“python”一词,因为我想运行 .py 脚本。

我的任务已正确编写,以便将这些变量读取为命令行参数(sys.argv 属性)。


wei*_*wei -1

尝试os.system("YOUR COMMAND HERE")