当任务失败时,我有松弛警报,但我也希望有恢复消息。
\n\n当任务最初失败时,on_failure_callback它会执行一个xcom_push. 我在此处保存的内容可在下一次 DAG 运行中使用:
context[\'ti\'].xcom_pull(key=\'my_task_state\',\n task_ids=context[\'task\'].task_id,\n include_prior_dates=True)\nRun Code Online (Sandbox Code Playgroud)\n\n但是,如果我清除失败的任务以便它重新运行,则在其on_failure_callback/中on_success_callback我尝试获取我在初始尝试中保存的值:
context[\'ti\'].xcom_pull(key=\'my_task_state\',\n task_ids=context[\'task\'].task_id,\n include_prior_dates=False)\nRun Code Online (Sandbox Code Playgroud)\n\n这样就返回了None。如果我设置它\xe2\x80\x99,将返回上一次include_prior_dates=TrueDAG 运行的值,但不会返回清除任务的当前值。
我是否做错了什么,或者是否有一个解决方法可以用来获取我正在寻找的 XCom 值\xe2\x80\x99m ?
\n王勇的回答很好地解释了为什么我无法获得我想要的值。不过,我能够想出一个解决方法。
xcom_push两者xcom_pull都只是调用 XCom 上的类方法。您可以直接调用这些。事实证明,您可以使用一个组合的任务 ID,它会将其保存到xcom该 ID 下的表中。由于它不是真正的任务,因此当任务(或 DAG)被清除时,它不会被删除。
from airflow.models import XCom
def set_xcom(context, value):
XCom.set(key='my_key',
value=value
task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
dag_id=context['ti'].dag_id,
execution_date=context['ti'].execution_date)
def get_xcom(context):
return XCom.get_one(context['ti'].execution_date,
key='my_key',
task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
dag_id=context['ti'].dag_id,
include_prior_dates=False)
Run Code Online (Sandbox Code Playgroud)
这不是使用 XCom 的标准方式,所以我以后在升级到新版本的 Airflow 时必须小心。
小智 6
以我的理解:
Xcom 旨在用于任务之间交换消息。Xcom 状态取决于任务实例。如果任务实例被清除(删除),属于该实例的xcom历史信息也将被删除。
这就是为什么你得到
当include_prior_dates=False时无,(任务实例已删除,没有该xcom记录)
当 include_prior_dates=True 时,最后一个 dag 信息(任务实例被删除,但另一个 dags 任务实例 xcome 被弹出(最近)。
这是默认情况下 example_xcom 的展示案例:
这就是为什么你的程序没有得到任何结果。
如果您发现答案有帮助,请投票。谢谢
| 归档时间: |
|
| 查看次数: |
3644 次 |
| 最近记录: |