Nez*_*ezo 6 python workflow etl airflow prefect
在 Prefect 中,假设我有一些管道,它为列表中的每个日期运行 f(date),并将其保存到文件中。这是一个非常常见的 ETL 操作。在气流中,如果我运行一次,它将回填所有历史日期。如果我再次运行它,它会知道任务已经运行,并且只运行任何已经出现的新任务(即最新日期)。
在 Prefect 中,据我所知,它每天都会运行整个管道,即使 99% 的任务在前一天完成。在不切换到 Prefect Cloud 的情况下,有哪些解决方案可以解决这个问题?在退出之前,您是否只是做一些诸如让每个任务缓存它在 redis 中完成的事情?
小智 7
Prefect 有许多一流的缓存处理方式,这取决于您想要多少控制。对于每个任务,您可以指定是否应缓存结果、应将其缓存多长时间以及应如何使缓存失效(年龄、任务的不同输入、流参数值等)。
缓存任务的最简单方法是使用targets,它允许您指定任务具有可模板化的副作用(通常是本地或云存储中的文件,但可以是例如数据库条目、redis 键或其他任何内容)。在任务运行之前,它会检查副作用是否存在,如果存在,则跳过运行。
例如,此任务将其结果写入本地文件,自动以任务名称和当前日期为模板:
@task(result=LocalResult(), target="{task_name}-{today}")
def get_data():
return [1, 2, 3, 4, 5]
Run Code Online (Sandbox Code Playgroud)
只要匹配文件存在,任务就不会重新运行。因为{today}是目标名称的一部分,这将隐式缓存一天的任务值。您还可以使用模板中的参数(如回填日期)来复制 Airflow 的行为。
对于更多的控制,你可以使用知府的全缓存机制的设置cache_for,cache_validator以及cache_key任何任务。如果设置,任务将在一个Cached状态而不是一个Success状态中完成。当与合适的编排后端(如 Prefect Server 或 Prefect Cloud)搭配使用时,Cached可以通过未来运行相同任务(或具有相同 的任何任务cache_key)来查询状态。未来的任务将返回Cached状态作为它自己的结果。
| 归档时间: |
|
| 查看次数: |
565 次 |
| 最近记录: |