来自 XCom 值的 Airflow BashOperator 参数

Inf*_*892 3 airflow

我在将 xcom 值分配给 BashOperator 时遇到一些问题。

除 tmp_dir 之外的所有参数均已正确检索,tmp_dir 是 init_dag 期间生成的 xcom 值。我能够检索自定义运算符中的值,但无法在 BashOperator 中检索该值。我添加了我想到的三种不同方法的输出。

我认为一种方法可能是我可以将该值存储在变量中,但我也无法弄清楚如何存储。

任何帮助将不胜感激。

这是我的 DAG 代码:

import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from utility import util
import os

from airflow.operators.bash_operator import BashOperator

from operators.mmm_operator import MMMOperator  #it is a custom operator
from operators.iftp_operator import IFTPOperator #it is another custom operator

AF_DATAMONTH = util.get_date_by_format(deltaMth=2,deltaDay=0,ft='%b_%Y').lower() #it gives a date in required format
AF_FILENM_1 = 'SOME_FILE_' + AF_DATAMONTH + '.zip'  #required filename for ftp

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(0),
 }

dag = DAG(dag_id='my_dag', default_args=default_args, schedule_interval=None)


init_dag = MMMOperator(
    task_id='init_dag',
    provide_context=True,
    mmm_oracle_conn_id=Variable.get('SOME_VARIABLE'),
    mmm_view="{0}.{1}".format(Variable.get('ANOTHER_VARIABLE'), AF_DAG_MMM_VIEW_NM),
    mmm_view_filter=None,
    mmm_kv_type=True,
    mmm_af_env_view="{0}.{1}".format(Variable.get('ANOTHER_VARIABLE_1'),Variable.get('ANOTHER_VARIABLE_2')),
    dag=dag
)  #local_tmp_folder is generated here and pushed via xcom


download_ftp_files = IFTPOperator(task_id='download_ftp_files',
                    ftp_conn_id=util.getFromConfig("nt_conn_id"), #value properly retrieved by xcom_pull  
                    operation='GET',
                    source_path=util.getFromConfig("nt_remote_folder"), #value properly retrieved by xcom_pull
                    dest_path=util.getFromConfig("local_tmp_folder"), #value properly retrieved by xcom_pull
                    filenames=AF_FILENM,                    
                    dag=dag
)



bash_cmd_template = "cd /vagrant/ && python3 hello_print.py {{params.client}} {{params.task}} {{params.environment}} {{params.tmp_dir}} {{params.af_file_nm}}"

#try 1 output value for params.tmp_dir: {{ ti.xcom_pull(task_ids="init_dag")["local_tmp_folder"] }} - instead of the actual tmp folder location
#try 2 and try 3 output: Broken DAG: [/home/vagrant/airflow/dags/my_dag.py] name 'ti' is not defined - message in UI

execute_main_py_script = BashOperator(
    task_id='execute_main_py_script',
    bash_command=bash_cmd_template,
    params={'client' : 'some_client',
            'task' : 'load_some_task',
            'environment' : 'environment_name',
            #'tmp_dir' : util.getFromConfig("local_tmp_folder"),   #try 1
            #'tmp_dir' : {{ ti.xcom_pull(task_ids="init_dag")["local_tmp_folder"] }} #try 2
            #'tmp_dir' : ti.xcom_pull(task_ids="init_dag")["local_tmp_folder"] #try 3
             'af_file_nm' : AF_FILENM_1
            },
    provide_context=True,        
    dag=dag
)


init_dag >> download_ftp_files >> execute_main_py_script       
Run Code Online (Sandbox Code Playgroud)

kax*_*xil 5

paramsBashOperator 的参数不是 Jinja 模板化的,因此您传入的任何值都将params“按原样”呈现。

tmp_dir您应该直接传递in的值,bash_cmd_template如下所示:

bash_cmd_template = """
cd /vagrant/ && python3 hello_print.py {{params.client}} {{params.task}} {{params.environment}} {{ ti.xcom_pull(task_ids="init_dag")["local_tmp_folder"] }} {{params.af_file_nm}}
"""


execute_main_py_script = BashOperator(
    task_id='execute_main_py_script',
    bash_command=bash_cmd_template,
    params={'client' : 'some_client',
            'task' : 'load_some_task',
            'environment' : 'environment_name',
            'af_file_nm' : AF_FILENM_1
            },
    provide_context=True,        
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)