Kin*_*ngz 6 amazon-s3 airflow airflow-scheduler apache-airflow-xcom airflow-operator
是否可以仅在发生特定事件(例如将文件放入特定 S3 存储桶的事件)时运行气流任务。类似于 AWS Lambda 事件的内容
有,S3KeySensor
但我不知道它是否符合我的要求(仅在事件发生时运行 Task)
这是使问题更清楚的示例:
我有一个传感器对象如下
sensor = S3KeySensor(
task_id='run_on_every_file_drop',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='my-sensor-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
使用上述传感器对象,传感器任务的气流行为如下:
my-sensor-bucket
切换 DAG 之前已经存在与 S3 存储桶中的通配符匹配的对象名称ON
,则运行该任务(由于过去的 s3 对象的存在,我不想运行该任务)my-sensor-bucket
)我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,还是不能像基于事件的管道(类似于 AWS Lambda 的东西)那样设置
我应该指出,这个答案是针对 Airflow 的 1.x (1.10.x) 系列。在 Airflow 2.x 中,他们引入了一个称为触发器的概念,我还没有经常使用它。它可以通过根据何时接收到触发器来使传感器变得异步以重做传感器检查来提供帮助。我认为可以根据文件可用性设置触发器,并且 AWS 和 GCP 提供程序模块已经定义了一些触发器。S3 的 EG 有这些文档。
气流基本上是围绕基于时间的调度来组织的。
您可以通过以下几种方式来获得您想要的东西:
如果您选择路线 3,您将在下一次运行 DAG 及其传感器之前删除通过传感器的密钥。请注意,由于 S3 最终一致性,路由 1 和 2 更加可靠。
归档时间: |
|
查看次数: |
3433 次 |
最近记录: |