小编mic*_*234的帖子

有没有办法为 Prefect 中的新 Flow 回填历史数据(一次)?

我刚开始阅读有关 Prefect 的文章(并且有一点使用 Airflow 的经验)。

我的目标是设置一个每天在 Prefect 中运行的任务,并将数据收集到一个文件夹中(我想这肯定是 Prefect 可以帮助我做的)。我的目标也是填充历史数据(就像我及时运行这项工作一样)。

在 Airflow 中有 start_date 的概念,当它在过去设置时,DAG 将从该日期开始运行并在每个时间间隔填充。

例如,如果我有一个任务需要一个日期并返回该日期的数据,例如:

# Pseudo code
def get_values_from_somewhere(date: datetime) -> dict:
    return fetched_values_in_json(date)
Run Code Online (Sandbox Code Playgroud)

在 Prefect 中是否有一种本地方式可以做到这一点?我在这里或文档中的任何地方都找不到这个答案,尽管这里提到回填。任何帮助/指导都将非常有用。

我试过的:

当我设置schedule为:

from datetime import datetime, timedelta

from prefect.schedules import Schedule

schedule = Schedule(clocks=[IntervalClock(interval=timedelta(hours=24), start_date=datetime(2019, 1, 1))])
Run Code Online (Sandbox Code Playgroud)

然后我做flow.run()我只是得到:

INFO:prefect.My-Task:Waiting for next scheduled run at 2020-09-24T00:00:00+00:00
Run Code Online (Sandbox Code Playgroud)

我期望的是从start_date我提供的开始运行,然后暂停直到它到达当前时间并等待下一个时间表。

etl prefect

4
推荐指数
1
解决办法
668
查看次数

标签 统计

etl ×1

prefect ×1