如何防止气流回填dag运行?

m0m*_*eni 44 python scheduled-tasks airflow

假设你有一个气流DAG是没有意义回填,这意味着,它的运行一次后,运行它随后的时间很快就完全没有意义的.

例如,如果您从一些仅每小时更新一次的数据库加载数据到数据库中,那么快速连续发生的回填只会一次又一次地导入相同的数据.

当您实例化一个新的每小时任务时,这尤其令人讨厌,并且它会在N您指定的时间间隔内开始运行之前,每小时都会错过一次,执行冗余工作.

我能想到的唯一解决方案是他们在文档的常见问题解答中特别建议

我们建议不要将动态值用作start_date,尤其是datetime.now()因为它可能非常混乱.

有没有办法禁用DAG的回填,或者我应该怎么做?

sag*_*e88 39

升级到airflow版本1.8并在airflow.cfg中使用catchup_by_default = False或对每个dag应用catchup = False.

https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default

  • @Nick默认的args对象由应用于在DAG**下运行的**任务**的参数组成,而不是由DAG**本身组成.我最初也对此感到困惑. (6认同)
  • 我设置了catchup_by_default = False,但Airflow仍然回填了这些工作.知道为什么吗?我正在运行1.8版本 (5认同)
  • 同样,在Airflow 1.10.1上.我在所有的dags上设置```catchup = False```,我仍然得到回填. (4认同)
  • 我正在使用Airflow v1.10.0,我仍然看到这个问题 (3认同)
  • @Nick我实际上无法使默认设置工作,所以我最终只在我的所有DAG上添加`catchup = False`,如`DAG('example',default_args = default_args,schedule_interval ='0 5***',catchup = False)` (2认同)

Zig*_*ien 16

这似乎是一个未解决的Airflow问题.我知道我真的希望拥有完全相同的功能.就我而言,这就是我的意思; 它可能对其他人有用.

UI功能(至少在1.7.1.3中)可以帮助解决这个问题.如果您转到树视图并单击特定任务(方框),则会出现一个对话框按钮,其中包含"标记成功"按钮.单击"过去",然后单击"标记成功"将在DAG中将该任务的所有实例标记为成功,并且不会运行它们.顶级DAG(顶部的圆圈)也可以以类似的方式标记为成功,但似乎没有标记多个DAG实例的方法.

我还没有深入研究它,但可以使用'trigger_dag'子命令来标记DAG的状态.见这里:https://github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

用于标记DAG的CLI功能正在开发中: http ://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%3CJIRA.12973462.1464369259000.37918.1465189859133@Atlassian.JIRA%3E https:// github的.com /阿帕奇/培养箱-气流/拉/ 1590

更新(2016年9月28日):添加了一个新的运营商'LatestOnlyOperator'(https://github.com/apache/incubator-airflow/pull/1752),它只运行最新版本的下游任务.听起来非常有用,希望它能很快进入发布

更新2:从气流1.8开始,LatestOnlyOperator已经释放.

  • 请注意,LatestOnlyOperator 将下游任务设置为“跳过”状态。根据文档,跳过的状态会传播,从而也跳过所有直接上游的任务。当您(ew)希望上游作业使用过期数据成功运行时,这使得该方法不合适。在这种情况下,最好的解决方案是在您的代码中添加一个早期运算符,如果任务运行得特别晚,则该运算符会成功。 (2认同)

小智 6

在dag声明中设置catchup = False将提供此确切功能。

我没有“声誉”来评论,但是我想说catchup = False是(正是由我自己)设计的目的。另外,我可以验证在实例化中明确设置时在1.10.1中它是否可以正常工作。但是,当放置在默认参数中时,我看不到它起作用。不过,我离开Airflow已经18个月了,因此我要花点时间了解一下为什么默认args不能用于追赶。

dag = DAG('example_dag',
        max_active_runs=3,
        catchup=False,
        schedule_interval=timedelta(minutes=5),
        default_args=default_args)
Run Code Online (Sandbox Code Playgroud)

  • 我正在运行“airflow 1.10.14”,但这不起作用,至少在使用 DebugExecutor 时不起作用 (2认同)