气流:执行日期为执行日期=触发器日期=固定时间表

pol*_*loC 6 airflow

在气流方面,我想每个星期一的上午8点运行一次dag(当然,execution_date应该是“当日星期一,上午8点”)。为此工作流设置的相关参数是:

  • start_date :“ 2018-03-19”
  • schedule_interval :“ 0 8 * * MON”

我希望每个星期一早上8点见到一个小跑。第一个运行时间为2018年3月19日上午8点与execution_date= 2018-03-19-08-00-00,以此类推。

但是,这不会发生:dag不在上午8点19/03/18开始。实际行为在这里举例说明:https : //stackoverflow.com/a/39620901/1510109/sf/answers/3374977511/ 行为是:在间隔的每个结束时(在我的每周情况下,运行dag的时间为:execution_date =时间间隔的开始(即前一周)。这种行为显然是由“ ETL思维方式”推动的(请参见上面的链接)。但这绝对不是我想要的。

怎么样,我可以实现在上午08:00与运行我的DAG每个星期一execution_date = trigger_date=现在(=电流星期一)上午8时?

谢谢

dla*_*lin 6

以一个快速浏览一下我的回答与开始时间和execution_date例子。

您想在每个星期一的上午8点运行。

因此,这部分将保持不变:

schedule_interval: '0 8 * * MON',
Run Code Online (Sandbox Code Playgroud)

您希望它运行于2018年3月19日,因为第一次运行发生在开始日期之后的第一个完整计划周期的末尾,因此您应该将开始日期更改为:

start_date: datetime(2018,03,12),
Run Code Online (Sandbox Code Playgroud)

您将不得不面对这样一个事实,即Airflow会在每个周期的开始命名您的DagRun,并根据execution_date设置将其传递给间隔周期的开始。相应地调整逻辑。

您的第一次运行将在2018-03-19T08:00:00.0Z和之后开始,并且execution_date依赖于它的所有其他宏都将开始,并且DagRun的名称将为2018-03-12T08:00:00.0Z

只要您了解了DAG的期望,execution_date并且您不尝试将自己的时间作为datetime.now()DAG的基础,那么DAG就能在操作中发挥作用。可以my_execution_date = execution_date + datetime.timedelta(7)在任何PythonOperator或自定义运算符中随意创建一个新变量(您可以从任务的上下文中获取execute_date),可以使用模板语句,例如{{ (execution_date + macros.timedelta(7)).strftime('%Y%m%d') }}{{ macros.ds_add(ds, 7) }},或使用next_execution_date

您甚至可以添加dag级别(user_defined_macros{'dt':lambda d: d+datetime.timedelta(days=7)}启用){{ dt(execution_date) }}。并且最近user_defined_filters添加了{'dt':lambda d: d+datetime.timedelta(days=7)}启用功能{{ execution_date | dt }}。在next_dsnext_execution_date将是你的目的更容易。

在考虑模板时,您还可以阅读其中的内置内容:http : //jinja.pocoo.org/docs/2.10/templates/#builtin-filters


归档时间:

查看次数:

4152 次

最近记录:

7 年,1 月 前