在我的DAG中,我TimeDeltaSensor使用以下方法创建了一个:
from datetime import datetime, timedelta
from airflow.operators.sensors import TimeDeltaSensor
wait = TimeDeltaSensor(
task_id='wait',
delta=timedelta(seconds=300),
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
但是当它运行时我得到了错误
- 子任务:[2018-07-13 09:00:39,663] {models.py:1427}错误-+ =:'NoneType'和'datetime.timedelta'不受支持的操作数类型
气流版本为1.8.1。
该代码基本上是从Example Pipeline定义中提取出来的,因此我对问题可能不满意。有任何想法吗?
我需要在Airflow中实现等待任务。等待时间大约需要几个小时。
首先,TimeDeltaSensor 无法正常工作。
SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
task_id="sleep_for_11_min",
delta=timedelta(minutes=SLEEP_MINUTES_1ST),
)
Run Code Online (Sandbox Code Playgroud)
对于每日时间表,例如:
schedule_interval='30 06 * * *'
Run Code Online (Sandbox Code Playgroud)
只需等待下一个时间表:
[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come
Run Code Online (Sandbox Code Playgroud)
这在代码中非常明显: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43
(更不用说使用时间表时的已知错误:无或@once)
接下来的尝试是使用 TimeSensor,如下所示:
SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeSensor(
task_id="sleep_for_11_min",
provide_context=True,
target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
trigger_rule=TriggerRule.NONE_FAILED
)
Run Code Online (Sandbox Code Playgroud)
这实际上效果很好,但在Poke模式下,需要一名工作人员来完成整个等待时间。我收到了使用重新安排模式的建议,但只需添加:
mode='reschedule',
Run Code Online (Sandbox Code Playgroud)
每次重新安排检查时都会生成新的安排,并且永远不会像这样完成:
[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 …Run Code Online (Sandbox Code Playgroud)