保持气流管道状态

EMi*_*ler 5 airflow

我是气流新手,我觉得我可能缺少一些约定或概念。

上下文:我定期将文件放入 S3 存储桶中。我的管道需要获取新文件并处理它们。

基本上:如何避免重新处理?

管道的某些部分将来可能会发生变化,我重新处理文件。但在日常工作中,我不想重新处理文件。此外,将来可能还会有其他管道需要从头开始并处理所有文件以获得不同的输出。

我有很多保存状态的杂乱方法(本地 json 文件,或检查输出文件的存在) - 但我想知道气流中是否有约定。目前对我来说最有意义的是重新使用气流存在的 postgres(可能是不好的形式?),添加另一个数据库并开始在那里创建表,我在其中列出输入文件(如果它们已针对工作流 X 进行处理) 、工作流程 Y 等

你会怎么做?

jhn*_*lvr 3

以下是我如何通过 4 个任务解决类似问题DAG

编写一个扩展的自定义 S3Sensor BaseSensorOperator。该传感器使用boto3库,监视存储桶中的特定文件夹。如果有任何文件放入此存储桶,它会将所有文件路径发布到Xcom

  1. 该传感器是 dag 中的第一个操作员。

  2. dag 中的下一个运算符是一个 Python 运算符,它从 Xcom 的先前任务中读取列表。它将所有文件移动到同一存储桶中的另一个文件夹,再次列出 Xcom 的新路径。

  3. 下一个操作员处理每个文件。

  4. 下一个操作员再次触发相同的 dag(因此我们从自定义 s3 文件传感器开始,因为该 dag 会重新触发自身)。

dag 不需要有任何schedule_interval,并且需要手动触发一次。然后它会一直监视桶,直到永远,或者直到有东西破裂。