谁能给我指出一个如何使用 Airflow FileSensor 的例子?我用谷歌搜索过,还没有找到任何东西。任何例子就足够了。我的用例很简单:
等待预定的 DAG 将文件放入路径中,FileSensor 任务将其拾取、读取内容并处理它。
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator import DummyOperator
import datetime
import airflow
# https://airflow.apache.org/code.html#airflow.models.BaseOperator
default_args = {
"depends_on_past" : False,
"start_date" : airflow.utils.dates.days_ago( 1 ),
"retries" : 1,
"retry_delay" : datetime.timedelta( hours= 5 ),
}
with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )
sensor_task = FileSensor( task_id= "my_file_sensor_task", poke_interval= 30, fs_conn_id= <path>, filepath= <file or directory name> )
start_task >> sensor_task >> stop_task
Run Code Online (Sandbox Code Playgroud)
一个简单的FileSensor
任务示例:
second_task = FileSensor(
task_id="file_sensor_task_id",
filepath="{{ task_instance.xcom_pull(task_ids='get_filepath_task') }}",
#fs_conn_id="fs_default" # default one, commented because not needed
poke_interval= 20,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
在这里,我作为传递filepath
先前的返回值PythonOperator
task_id
(命名为get_filepath_task
使用)xcom_pull
。但它可以是您正在检查存在的文件路径或目录的任何字符串。
该fs_conn_id
参数是您在 UI 管理/连接部分中可用的连接的字符串名称。
的默认值fs_conn_id
是"fs_default"
(可以在FileSensor
类操作符的代码中看到)。检查 UI 管理/连接,你会找到它。
如果您想检查本地是否存在文件或目录,您可以跳过fs_conn_id
并直接传递参数filepath
。
在poke_interval
从继承BaseSensorOperator
,它表明在几秒钟的时间,作业要等待每个尝试之间。默认值为 60 秒。