从 Cloud Function 的文件到达事件触发 Composer DAG 上的任务

utp*_*utp 2 airflow google-cloud-functions google-cloud-composer

我可以通过云函数触发气流任务吗?

基本上我的问题是这样的。我有一些文件到达谷歌云存储。同一 DAG 中的多个文件。文件到达时我需要触发转换作业。我想使用云功能。但我的 DAG 中有很多依赖作业。

任何帮助表示赞赏

小智 6

您不一定需要 Cloud Function 来感知 GCS 中的文件,Composer 具有可用于实现此目的的 GCS 传感器。

假设您必须监视 Bucket/folder/file_*.csv 中的文件,然后:

from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
    from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor 
    import datetime as dt
    from airflow.models import DAG
    from airflow.operators.dagrun_operator import TriggerDagRunOperator

    lasthour = dt.datetime.now() - dt.timedelta(hours=1)

    args = {
     'owner': 'airflow',
     'start_date': lasthour,
     'depends_on_past': False,
    }
    dag = DAG(
     dag_id='GCS_sensor_dag',
     schedule_interval=None,
     default_args=args
    )
    GCS_File_list = GoogleCloudStorageListOperator(
                        task_id= 'list_Files',
                        bucket= 'bucketname',
                        prefix='folder/file_',
                        delimiter='.csv',
                        google_cloud_storage_conn_id='google_cloud_default',
                        dag = dag
                    )
    file_sensor = GoogleCloudStoragePrefixSensor(
                        task_id='gcs_polling',  
                        bucket='bucketname',
                        prefix='folder/file_',
                        dag=dag
                    )

    trigger = TriggerDagRunOperator(
                        task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
                        trigger_dag_id="GCS_sensor_dag",
                        dag=dag
                    )

file_sensor >> GCS_File_list >> trigger
Run Code Online (Sandbox Code Playgroud)