Airflow BashOperator收集返回代码

Ace*_*rey 9 python airflow

我刚刚开始使用Airflow,请耐心等待,我要做的是从BashOperator任务中收集返回代码并将其保存到本地变量,然后根据返回代码分支到另一个任务.我的问题是弄清楚如何让BashOperator返回一些东西.以下是我的代码段:

dag = DAG(dag_id='dag_1',
      default_args=default_args,
      schedule_interval='0 2 * * *',
      user_defined_macros=user_def_macros,
      dagrun_timeout=timedelta(minutes=60)
      )
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)
Run Code Online (Sandbox Code Playgroud)

我正在尝试xcom_push但老实说不知道它是如何工作的..这是收集结果的正确方法吗?在日志中,最后一行是:命令退出,返回码为0.

K-Y*_*-Yo 7

根据BashOperator文档

如果xcom_push为True,则在bash命令完成时,写入stdout的最后一行也将被推送到XCom。

知道这一点,您只需要使bash脚本最后打印错误代码,因此将以下内容附加到您的bash_command

<your code> ; echo $?
Run Code Online (Sandbox Code Playgroud)

您的情况是:

oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}; echo $?", dag=dag)
Run Code Online (Sandbox Code Playgroud)


Pri*_*hta 3

你能发布整个 DAG 吗?我认为您在解释 Airflow 的工作原理时遇到了问题

从 Task1 (如果它是 bash 操作符)你可以执行以下操作:

t1 = BashOperator(task_id='t1', bash_command='echo "{{ ti.xcom_push("t1") }}"', dag=dag)
Run Code Online (Sandbox Code Playgroud)

在任务2中:

t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"', dag=dag)
Run Code Online (Sandbox Code Playgroud)

其中 ti 是 task_instance 变量,{{}} 表示法用于访问变量部分