Airflow S3KeySensor - 如何让它继续运行

Kyl*_*ine 14 boto3 airflow airflow-scheduler

这个Stackoverflow帖子的帮助下,我刚刚创建了一个程序(帖子中显示的那个),当一个文件放在S3存储桶中时,我的一个正在运行的DAG中的任务被触发,然后我使用BashOperator执行一些工作.一旦完成,虽然DAG不再处于运行状态,而是进入成功状态,如果我想让它拿起另一个文件,我需要清除所有"过去","未来","上游",下游'活动.我想制作这个程序,以便它始终在运行,并且只要在S3存储桶中放置一个新文件,程序就会启动任务.

我可以继续使用S3KeySenor执行此操作,还是需要设置一种设置外部触发器来运行DAG的方法?截至目前,我的S3KeySensor如果只运行一次就毫无意义.

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 29),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')

# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
    task_id='create_emr_cluster_1',
    bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
    retries=1,
    dag=dag)

t1 = BashOperator(
    task_id='success_log',
    bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='new_s3_file_in_foobar-bucket',
    bucket_key='*',
    wildcard_match=True,
    bucket_name='foobar-bucket',
    s3_conn_id='s3://foobar-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

我想知道这是不可能的,因为它不会是一个有向无环图,而是它会有一个重复传感器的循环- > t1 - > t2 - >传感器 - > t1 - > t2 - >传感器 - > ......不断重复.

更新:

我的用例非常简单,只要在指定的AWS S3 Bucket中放置新文件,我希望触发我的DAG并开始执行各种任务.这些任务将执行以下操作:实例化新的AWS EMR集群,从AWS S3存储桶中提取文件,执行某些AWS EMR活动,然后关闭AWS EMR集群.从那里开始,DAG将返回等待状态,等待新文件到达AWS S3 Bucket,然后无限期地重复该过程.

Tay*_*ton 11

在Airflow中,没有一个概念映射到始终运行的DAG.如果适合您的用例,您可以非常频繁地运行DAG,例如每1到5分钟.

这里主要的是S3KeySensor检查,直到它检测到密钥的通配符路径(或超时)中存在第一个文件,然后运行.但是当第二个,第三个或第四个文件落地时,S3传感器已经完成了DAG运行的运行.在下一次DAG运行之前,它不会被安排再次运行.(您描述的循环思想大致相当于调度程序在创建DAG运行时所执行的操作,除非永远不会.)

对于您的用例,外部触发器听起来绝对是最好的方法,无论该触发是通过Airflow CLI的trigger_dag命令($ airflow trigger_dag ...)来实现的:

https://github.com/apache/incubator-airflow/blob/972086aeba4616843005b25210ba3b2596963d57/airflow/bin/cli.py#L206-L222

或者通过REST API:

https://github.com/apache/incubator-airflow/blob/5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5/airflow/www/api/experimental/endpoints.py#L41-L89

两者都转向并trigger_dag在通用(实验)API中调用该函数:

https://github.com/apache/incubator-airflow/blob/089c996fbd9ecb0014dbefedff232e8699ce6283/airflow/api/common/experimental/trigger_dag.py#L28-L67

例如,您可以设置AWS Lambda函数,该函数在文件落在S3上时调用,该函数运行触发器DAG调用.

  • 好的谢谢.这就是我所期望的答案.我可以像你说的那样轻松设置一个Lambda函数来进行REST API调用; 如果我使用AWS Data Pipeline,这也是我必须要做的,每次使用Lambda函数时我都必须激活它. (3认同)