基于事件触发和运行气流任务将文件放入 S3 存储桶

Kin*_*ngz 6 amazon-s3 airflow airflow-scheduler apache-airflow-xcom airflow-operator

是否可以仅在发生特定事件(例如将文件放入特定 S3 存储桶的事件)时运行气流任务。类似于 AWS Lambda 事件的内容

有,S3KeySensor但我不知道它是否符合我的要求(仅在事件发生时运行 Task)

这是使问题更清楚的示例:

我有一个传感器对象如下

sensor = S3KeySensor(
    task_id='run_on_every_file_drop',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='my-sensor-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

使用上述传感器对象,传感器任务的气流行为如下:

  • 如果在气流管理 UI 中my-sensor-bucket切换 DAG 之前已经存在与 S3 存储桶中的通配符匹配的对象名称ON,则运行该任务(由于过去的 s3 对象的存在,我不想运行该任务)
  • 运行一次后,每当有新的 S3 文件对象删除时,传感器任务将不会再次运行(我想每次在存储桶中删除一个新的 S3 文件对象时,都运行 DAG 中的传感器任务和后续任务my-sensor-bucket
  • 如果您配置调度程序,任务将基于调度而不是基于事件运行。所以在这种情况下调度程序似乎不是一个选项

我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,还是不能像基于事件的管道(类似于 AWS Lambda 的东西)那样设置

dla*_*lin 4

我应该指出,这个答案是针对 Airflow 的 1.x (1.10.x) 系列。在 Airflow 2.x 中,他们引入了一个称为触发器的概念,我还没有经常使用它。它可以通过根据何时接收到触发器来使传感器变得异步以重做传感器检查来提供帮助。我认为可以根据文件可用性设置触发器,并且 AWS 和 GCP 提供程序模块已经定义了一些触发器。S3 的 EG 有这些文档

原答案如下

气流基本上是围绕基于时间的调度来组织的。

您可以通过以下几种方式来获得您想要的东西:

  1. 假设您在 S3 上有一个 SQS 事件,它会触发一个 AWS Lambda,该 AWS Lambda 调用气流 API 来触发 dag 运行。
  2. 您可以使用 SQS 传感器启动 DAG,当它收到 s3 更改事件时,它只会继续处理 DAG 的其余部分(有关重新调度,请参阅 3_1 和 3_2)。
  3. 您可以使用传感器(如您展示的传感器)启动 DAG,它不会选择要运行的任务,它只是传递到下一个相关任务或超时。您必须删除使传感器匹配的密钥。
    1. 您可以通过使最终任务重新触发 DAG 来重新运行。
    2. 或者将计划间隔设置为每分钟,不进行追赶,并将最大活动 DAG 运行设置为 1。这样,一次运行将处于活动状态,传感器将保持它直到超时。如果完成或超时,下一次运行将在一分钟内开始。

如果您选择路线 3,您将在下一次运行 DAG 及其传感器之前删除通过传感器的密钥。请注意,由于 S3 最终一致性,路由 1 和 2 更加可靠。