在气流方面,我想每个星期一的上午8点运行一次dag(当然,execution_date应该是“当日星期一,上午8点”)。为此工作流设置的相关参数是:
start_date :“ 2018-03-19”schedule_interval :“ 0 8 * * MON”我希望每个星期一早上8点见到一个小跑。第一个运行时间为2018年3月19日上午8点与execution_date= 2018-03-19-08-00-00,以此类推。
但是,这不会发生:dag不在上午8点19/03/18开始。实际行为在这里举例说明:https : //stackoverflow.com/a/39620901/1510109或/sf/answers/3374977511/ 行为是:在间隔的每个结束时(在我的每周情况下,运行dag的时间为:execution_date =时间间隔的开始(即前一周)。这种行为显然是由“ ETL思维方式”推动的(请参见上面的链接)。但这绝对不是我想要的。
怎么样,我可以实现在上午08:00与运行我的DAG每个星期一execution_date = trigger_date=现在(=电流星期一)上午8时?
谢谢
以一个快速浏览一下我的回答与开始时间和execution_date例子。
您想在每个星期一的上午8点运行。
因此,这部分将保持不变:
schedule_interval: '0 8 * * MON',
Run Code Online (Sandbox Code Playgroud)
您希望它运行于2018年3月19日,因为第一次运行发生在开始日期之后的第一个完整计划周期的末尾,因此您应该将开始日期更改为:
start_date: datetime(2018,03,12),
Run Code Online (Sandbox Code Playgroud)
您将不得不面对这样一个事实,即Airflow会在每个周期的开始命名您的DagRun,并根据execution_date设置将其传递给间隔周期的开始。相应地调整逻辑。
您的第一次运行将在2018-03-19T08:00:00.0Z和之后开始,并且execution_date依赖于它的所有其他宏都将开始,并且DagRun的名称将为2018-03-12T08:00:00.0Z
只要您了解了DAG的期望,execution_date并且您不尝试将自己的时间作为datetime.now()DAG的基础,那么DAG就能在操作中发挥作用。可以my_execution_date = execution_date + datetime.timedelta(7)在任何PythonOperator或自定义运算符中随意创建一个新变量(您可以从任务的上下文中获取execute_date),可以使用模板语句,例如{{ (execution_date + macros.timedelta(7)).strftime('%Y%m%d') }}或{{ macros.ds_add(ds, 7) }},或使用next_execution_date。
您甚至可以添加dag级别(user_defined_macros如 {'dt':lambda d: d+datetime.timedelta(days=7)}启用){{ dt(execution_date) }}。并且最近user_defined_filters添加了{'dt':lambda d: d+datetime.timedelta(days=7)}启用功能{{ execution_date | dt }}。在next_ds和next_execution_date将是你的目的更容易。
在考虑模板时,您还可以阅读其中的内置内容:http : //jinja.pocoo.org/docs/2.10/templates/#builtin-filters
| 归档时间: |
|
| 查看次数: |
4152 次 |
| 最近记录: |