utp*_*utp 2 airflow google-cloud-functions google-cloud-composer
我可以通过云函数触发气流任务吗?
基本上我的问题是这样的。我有一些文件到达谷歌云存储。同一 DAG 中的多个文件。文件到达时我需要触发转换作业。我想使用云功能。但我的 DAG 中有很多依赖作业。
任何帮助表示赞赏
小智 6
您不一定需要 Cloud Function 来感知 GCS 中的文件,Composer 具有可用于实现此目的的 GCS 传感器。
假设您必须监视 Bucket/folder/file_*.csv 中的文件,然后:
from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
import datetime as dt
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
lasthour = dt.datetime.now() - dt.timedelta(hours=1)
args = {
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
}
dag = DAG(
dag_id='GCS_sensor_dag',
schedule_interval=None,
default_args=args
)
GCS_File_list = GoogleCloudStorageListOperator(
task_id= 'list_Files',
bucket= 'bucketname',
prefix='folder/file_',
delimiter='.csv',
google_cloud_storage_conn_id='google_cloud_default',
dag = dag
)
file_sensor = GoogleCloudStoragePrefixSensor(
task_id='gcs_polling',
bucket='bucketname',
prefix='folder/file_',
dag=dag
)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
trigger_dag_id="GCS_sensor_dag",
dag=dag
)
file_sensor >> GCS_File_list >> trigger
Run Code Online (Sandbox Code Playgroud)