从 Airflow 中已清除的任务中获取 XCom

Dea*_*ean 3 airflow

当任务失败时,我有松弛警报,但我也希望有恢复消息。

\n\n

当任务最初失败时,on_failure_callback它会执行一个xcom_push. 我在此处保存的内容可在下一次 DAG 运行中使用:

\n\n
context[\'ti\'].xcom_pull(key=\'my_task_state\',\n                        task_ids=context[\'task\'].task_id,\n                        include_prior_dates=True)\n
Run Code Online (Sandbox Code Playgroud)\n\n

但是,如果我清除失败的任务以便它重新运行,则在其on_failure_callback/中on_success_callback我尝试获取我在初始尝试中保存的值:

\n\n
context[\'ti\'].xcom_pull(key=\'my_task_state\',\n                        task_ids=context[\'task\'].task_id,\n                        include_prior_dates=False)\n
Run Code Online (Sandbox Code Playgroud)\n\n

这样就返回了None。如果我设置它\xe2\x80\x99,将返回上一次include_prior_dates=TrueDAG 运行的值,但不会返回清除任务的当前值。

\n\n

我是否做错了什么,或者是否有一个解决方法可以用来获取我正在寻找的 XCom 值\xe2\x80\x99m ?

\n

Dea*_*ean 8

王勇的回答很好地解释了为什么我无法获得我想要的值。不过,我能够想出一个解决方法。

xcom_push两者xcom_pull都只是调用 XCom 上的类方法。您可以直接调用这些。事实证明,您可以使用一个组合的任务 ID,它会将其保存到xcom该 ID 下的表中。由于它不是真正的任务,因此当任务(或 DAG)被清除时,它不会被删除。

from airflow.models import XCom

def set_xcom(context, value):
    XCom.set(key='my_key',
             value=value
             task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
             dag_id=context['ti'].dag_id,
             execution_date=context['ti'].execution_date)

def get_xcom(context):
    return XCom.get_one(context['ti'].execution_date,
                        key='my_key',
                        task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
                        dag_id=context['ti'].dag_id,
                        include_prior_dates=False)
Run Code Online (Sandbox Code Playgroud)

这不是使用 XCom 的标准方式,所以我以后在升级到新版本的 Airflow 时必须小心。


小智 6

以我的理解:

Xcom 旨在用于任务之间交换消息。Xcom 状态取决于任务实例。如果任务实例被清除(删除),属于该实例的xcom历史信息也将被删除。

这就是为什么你得到

  • 当include_prior_dates=False时无,(任务实例已删除,没有该xcom记录)

  • 当 include_prior_dates=True 时,最后一个 dag 信息(任务实例被删除,但另一个 dags 任务实例 xcome 被弹出(最近)。

这是默认情况下 example_xcom 的展示案例:

    1. 所有成功状态:
      • 1.1 达格 在此输入图像描述
      • 1.2 xcom列表 在此输入图像描述
    1. 清除一项任务状态:
      • 2.1 清除 在此输入图像描述
      • 2.2删除xcom列表中的 xcome应该被删除,但是我无法截屏。

    这就是为什么你的程序没有得到任何结果。

    • 2.3 重做xcom列表(但时间戳不同,新记录) 在此输入图像描述

如果您发现答案有帮助,请投票。谢谢