我需要在Airflow中实现等待任务。等待时间大约需要几个小时。
首先,TimeDeltaSensor 无法正常工作。
SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
task_id="sleep_for_11_min",
delta=timedelta(minutes=SLEEP_MINUTES_1ST),
)
Run Code Online (Sandbox Code Playgroud)
对于每日时间表,例如:
schedule_interval='30 06 * * *'
Run Code Online (Sandbox Code Playgroud)
只需等待下一个时间表:
[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come
Run Code Online (Sandbox Code Playgroud)
这在代码中非常明显: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43
(更不用说使用时间表时的已知错误:无或@once)
接下来的尝试是使用 TimeSensor,如下所示:
SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeSensor(
task_id="sleep_for_11_min",
provide_context=True,
target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
trigger_rule=TriggerRule.NONE_FAILED
)
Run Code Online (Sandbox Code Playgroud)
这实际上效果很好,但在Poke模式下,需要一名工作人员来完成整个等待时间。我收到了使用重新安排模式的建议,但只需添加:
mode='reschedule',
Run Code Online (Sandbox Code Playgroud)
每次重新安排检查时都会生成新的安排,并且永远不会像这样完成:
[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 …Run Code Online (Sandbox Code Playgroud)