早上好.
我正在尝试设置DAG
使用在线教程和stackoverflow我已经能够提出以下DAG和运算符成功实现目标,但是我希望DAG重新安排或在完成时重新运行,以便开始观察/感知另一个文件.
我试图设置一个变量max_active_runs:1,然后schedule_interval: timedelta(seconds=5)这个是重新安排DAG,但开始排队任务并锁定文件.
欢迎任何关于如何在archive_task之后重新运行DAG的想法?
谢谢
DAG代码
from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator
from datetime import datetime, timedelta
from airflow.models import Variable
default_args = {
'owner': 'glsam',
'depends_on_past': False,
'start_date': datetime.now(),
'provide_context': True,
'retries': 100,
'retry_delay': timedelta(seconds=30),
'max_active_runs': 1,
'schedule_interval': timedelta(seconds=5),
}
dag = DAG('test_sensing_for_a_file', default_args=default_args)
filepath = Variable.get("soucePath_Test")
filepattern = Variable.get("filePattern_Test")
archivepath = Variable.get("archivePath_Test")
sensor_task = OmegaFileSensor(
task_id='file_sensor_task',
filepath=filepath,
filepattern=filepattern,
poke_interval=3,
dag=dag)
def process_file(**context):
file_to_process = context['task_instance'].xcom_pull( …Run Code Online (Sandbox Code Playgroud)