如何从AWS SQS触发Airflow DAG?

ypi*_*ard 5 amazon-sqs airflow

我想触发基于 SQS 消息的 Airflow DAD。我对 Airflow 很陌生,但我认为应该这样做:

选项1

使用气流 SQS 传感器。根据我的理解,这会等待 SQS 消息来继续执行已触发的DAG。这是否意味着 DAG 始终需要运行并等待 SQS 消息来捕获任何最终的新消息并处理它们?这是否也意味着我应该以非常短的时间间隔安排 DAG,以便当 DAG 处理 SQS 消息时,会创建另一个 DAG 来处理下一个 SQS 消息?

选项2

添加 lambda 或监视 SQS 消息的东西,并在需要时使用Airflow API触发 DAG。

最终,我希望最大限度地减少触发 DAG 所需的交互次数,因此我想使用 Airflow 内置方式来观察 SQS。

谢谢

Ela*_*lad 1

两个选项均有效,但选项 2 基本上是传感器的替代实现。我认为更好的解决方案是选项 1 进行一些修改:

使用SqsSensor这种mode='reschedule'方式,传感器每隔一段时间就会“唤醒”,检查是否满足标准。请注意,这不像sleep(x). 当不满足条件时,Airflow 将释放工作线程来执行需要运行的其他任务,并将其返回SqsSensor到调度队列。您可以在文档中阅读有关传感器模式的更多信息。

from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
SqsSensor(
    task_id='test_task',
    dag=dag,
    sqs_queue='your_queue',
    aws_conn_id='aws_default',
    mode='reschedule')
Run Code Online (Sandbox Code Playgroud)

请注意,传感器将无限期运行,直到满足标准。您可以设置timeout传感器任务(还有其他可能的超时原因,例如集群策略和其他默认值,但这是另一个主题)。

  • 传感器选项如何处理传入 SQS 消息的高峰值负载?是否会受到预定频率的限制?这意味着如果它被安排为每秒运行一次并且每秒有 2 条传入消息到达,那么它永远无法完全耗尽它? (2认同)