我刚刚开始使用Airflow,请耐心等待,我要做的是从BashOperator任务中收集返回代码并将其保存到本地变量,然后根据返回代码分支到另一个任务.我的问题是弄清楚如何让BashOperator返回一些东西.以下是我的代码段:
dag = DAG(dag_id='dag_1',
default_args=default_args,
schedule_interval='0 2 * * *',
user_defined_macros=user_def_macros,
dagrun_timeout=timedelta(minutes=60)
)
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)
Run Code Online (Sandbox Code Playgroud)
我正在尝试xcom_push但老实说不知道它是如何工作的..这是收集结果的正确方法吗?在日志中,最后一行是:命令退出,返回码为0.
如果xcom_push为True,则在bash命令完成时,写入stdout的最后一行也将被推送到XCom。
知道这一点,您只需要使bash脚本最后打印错误代码,因此将以下内容附加到您的bash_command:
<your code> ; echo $?
Run Code Online (Sandbox Code Playgroud)
您的情况是:
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}; echo $?", dag=dag)
Run Code Online (Sandbox Code Playgroud)
你能发布整个 DAG 吗?我认为您在解释 Airflow 的工作原理时遇到了问题
从 Task1 (如果它是 bash 操作符)你可以执行以下操作:
t1 = BashOperator(task_id='t1', bash_command='echo "{{ ti.xcom_push("t1") }}"', dag=dag)
Run Code Online (Sandbox Code Playgroud)
在任务2中:
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"', dag=dag)
Run Code Online (Sandbox Code Playgroud)
其中 ti 是 task_instance 变量,{{}} 表示法用于访问变量部分
| 归档时间: |
|
| 查看次数: |
6554 次 |
| 最近记录: |