小编Lok*_*oki的帖子

Airflow:高效地执行等待(睡眠)任务

我需要在Airflow中实现等待任务。等待时间大约需要几个小时。

首先,TimeDeltaSensor 无法正常工作。

SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
            task_id="sleep_for_11_min",
            delta=timedelta(minutes=SLEEP_MINUTES_1ST),                    
    )
Run Code Online (Sandbox Code Playgroud)

对于每日时间表,例如:

schedule_interval='30 06 * * *'
Run Code Online (Sandbox Code Playgroud)

只需等待下一个时间表:

[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come
Run Code Online (Sandbox Code Playgroud)

这在代码中非常明显: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43

(更不用说使用时间表时的已知错误:无或@once)

接下来的尝试是使用 TimeSensor,如下所示:

 SLEEP_MINUTES_1ST = 11
 sleep_task_1 = TimeSensor(
           task_id="sleep_for_11_min",
           provide_context=True,
           target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
           trigger_rule=TriggerRule.NONE_FAILED    
        )
Run Code Online (Sandbox Code Playgroud)

这实际上效果很好,但在Poke模式下,需要一名工作人员来完成整个等待时间。我收到了使用重新安排模式的建议,但只需添加:

mode='reschedule',
Run Code Online (Sandbox Code Playgroud)

每次重新安排检查时都会生成新的安排,并且永远不会像这样完成:

[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 …
Run Code Online (Sandbox Code Playgroud)

python jinja2 airflow

4
推荐指数
1
解决办法
1万
查看次数

标签 统计

airflow ×1

jinja2 ×1

python ×1