ypi*_*ard 5 amazon-sqs airflow
我想触发基于 SQS 消息的 Airflow DAD。我对 Airflow 很陌生,但我认为应该这样做:
使用气流 SQS 传感器。根据我的理解,这会等待 SQS 消息来继续执行已触发的DAG。这是否意味着 DAG 始终需要运行并等待 SQS 消息来捕获任何最终的新消息并处理它们?这是否也意味着我应该以非常短的时间间隔安排 DAG,以便当 DAG 处理 SQS 消息时,会创建另一个 DAG 来处理下一个 SQS 消息?
添加 lambda 或监视 SQS 消息的东西,并在需要时使用Airflow API触发 DAG。
最终,我希望最大限度地减少触发 DAG 所需的交互次数,因此我想使用 Airflow 内置方式来观察 SQS。
谢谢
两个选项均有效,但选项 2 基本上是传感器的替代实现。我认为更好的解决方案是选项 1 进行一些修改:
使用SqsSensor这种mode='reschedule'方式,传感器每隔一段时间就会“唤醒”,检查是否满足标准。请注意,这不像sleep(x). 当不满足条件时,Airflow 将释放工作线程来执行需要运行的其他任务,并将其返回SqsSensor到调度队列。您可以在文档中阅读有关传感器模式的更多信息。
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
SqsSensor(
task_id='test_task',
dag=dag,
sqs_queue='your_queue',
aws_conn_id='aws_default',
mode='reschedule')
Run Code Online (Sandbox Code Playgroud)
请注意,传感器将无限期运行,直到满足标准。您可以设置timeout传感器任务(还有其他可能的超时原因,例如集群策略和其他默认值,但这是另一个主题)。
| 归档时间: |
|
| 查看次数: |
6616 次 |
| 最近记录: |