相关疑难解决方法(0)

在气流中,出现故障时,有没有办法重复一组任务?

在我的 DAG 中,我的任务流程如下:

... >> EmrAddStepsOperator >> EmrStepSensor
Run Code Online (Sandbox Code Playgroud)

EmrAddStepsOperator 的成功意味着“我能够告诉 EMR 启动”。EmrStepSensor 失败意味着“EMR 任务出现问题”。我对这些描述可能有点偏差,但这与我想要表达的观点无关:

如果第二个任务失败,我想重新启动第一个任务,而不是第二个任务。当第二个任务失败时,如何让气流重新启动第一个任务?

python airflow

7
推荐指数
2
解决办法
3543
查看次数

将顶级DAG连接在一起

我需要有几个相同(只有在不同参数)的顶级DAG小号那也可以用以下限制/假设一起触发:

  • 各个顶级DAG将会拥有,schedule_interval=None因为它们仅需要偶尔的手动触发
  • 但是,一系列DAG需要每天运行
  • 序列中DAG的顺序数量是固定的(在编写代码之前已知),并且很少更改(几个月内一次)
  • 无论DAG是失败还是成功,触发链都不得中断
  • 当前,它们必须串联运行。将来他们可能需要并行触发

因此,我为dags目录中的每个DAG创建了一个文件,现在必须将它们连接起来以便顺序执行。我确定了两种方法可以完成此操作:

  1. SubDagOperator

  2. TriggerDagRunOperator

    • 可在我的演示中使用,并行运行(不按顺序运行),因为它不等待触发的DAG完成才移至下一个
    • ExternalTaskSensor 可能有助于克服上述限制,但会使事情变得很混乱

我的问题是

  • 如何克服的局限性parent_id前缀dag_idSubDagS'
  • 如何迫使TriggerDagRunOperatorS 等待DAG完成
  • 是否有其他替代/更好的方法可以独立的(顶级)DAG连接在一起?
  • 我为每个顶级DAG 创建单独文件(仅在输入方面有所不同的DAG)的方法是否有解决方法?

我正在使用puckel / …

airflow

6
推荐指数
1
解决办法
1073
查看次数

标签 统计

airflow ×2

python ×1