气流文件传感器的任何例子?

Dev*_*vEx 6 python airflow

谁能给我指出一个如何使用 Airflow FileSensor 的例子?我用谷歌搜索过,还没有找到任何东西。任何例子就足够了。我的用例很简单:

等待预定的 DAG 将文件放入路径中,FileSensor 任务将其拾取、读取内容并处理它。

Meg*_*Ray 7

文档源代码

from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator    import DummyOperator

import datetime
import airflow

# https://airflow.apache.org/code.html#airflow.models.BaseOperator
default_args = {
    "depends_on_past" : False,
    "start_date"      : airflow.utils.dates.days_ago( 1 ),
    "retries"         : 1,
    "retry_delay"     : datetime.timedelta( hours= 5 ),
}

with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:

    start_task  = DummyOperator(  task_id= "start" )
    stop_task   = DummyOperator(  task_id= "stop"  )
    sensor_task = FileSensor( task_id= "my_file_sensor_task", poke_interval= 30, fs_conn_id= <path>, filepath= <file or directory name> )

start_task >> sensor_task >> stop_task
Run Code Online (Sandbox Code Playgroud)

  • 什么是“fs_conn_id”以及我需要在这里替换什么? (3认同)

enr*_*nri 5

一个简单的FileSensor任务示例:

second_task = FileSensor(
                 task_id="file_sensor_task_id",
                 filepath="{{ task_instance.xcom_pull(task_ids='get_filepath_task') }}",
                 #fs_conn_id="fs_default" # default one, commented because not needed
                 poke_interval= 20,
                 dag=dag
              )
Run Code Online (Sandbox Code Playgroud)

在这里,我作为传递filepath先前的返回值PythonOperator task_id(命名为get_filepath_task使用)xcom_pull。但它可以是您正在检查存在的文件路径或目录的任何字符串。

fs_conn_id参数是您在 UI 管理/连接部分中可用的连接的字符串名称。

的默认值fs_conn_id"fs_default"(可以在FileSensor类操作符的代码中看到)。检查 UI 管理/连接,你会找到它。

如果您想检查本地是否存在文件或目录,您可以跳过fs_conn_id并直接传递参数filepath

poke_interval从继承BaseSensorOperator,它表明在几秒钟的时间,作业要等待每个尝试之间。默认值为 60 秒。