我刚刚开始使用Airbnb的气流,而且我仍然不清楚回填是如何/何时完成的.
具体来说,有2个用例让我困惑:
如果我运行airflow scheduler几分钟,停止它一分钟,然后再重新启动它,我的DAG似乎在前30秒左右运行额外的任务,然后它继续正常(每10秒运行一次).这些额外的任务是"回填"的任务,在早期的运行中无法完成吗?如果是这样,我怎么告诉气流不回填这些任务?
如果我运行airflow scheduler几分钟,然后运行airflow clear MY_tutorial,然后重新启动airflow scheduler,它似乎运行了一大堆额外的任务.这些任务是否也以某种方式"回填"任务?或者我错过了什么.
目前,我有一个非常简单的dag:
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2016, 10, 4),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 8)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
second_template = """
touch ~/airflow/logs/test
echo $(date) >> ~/airflow/logs/test
"""
t4 = BashOperator(
task_id='write_test',
bash_command=second_template,
dag=dag)
t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
我在airflow配置中改变的唯一两件事是
CeleryExecutor而不是一个SequentialExecutor非常感谢你的帮助!
小智 50
当您将DAG的调度程序切换为"on"时,调度程序将触发所有未记录状态的dag运行实例的回填,从您在"default_args"中指定的start_date开始.
例如:如果开始日期为"2017-01-21"并且您打开了"2017-01-22T00:00:00"的计划切换并且您的dag配置为每小时运行一次,那么调度程序将回填24 dag运行然后按计划的间隔开始运行.
这基本上就是你们两个问题中发生的事情.在#1中,它填写了从关闭调度程序的30秒开始的3次缺失运行.在#2中,它填充了从start_date到"now"的所有DAG运行.
有两种方法:
使用"-m"标志从命令行手动运行回填,该标志告诉气流不要实际运行DAG,而只是在数据库中将其标记为成功(https://airflow.incubator.apache.org/cli.html) .
例如
airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30
小智 10
请注意,从1.8版开始,Airflow允许您使用追赶来控制此行为。可以catchup_by_default=False在airflow.cfg catchup=False中设置,也可以
在DAG定义中设置。
参见https://airflow.apache.org/scheduler.html#backfill-and-catchup
| 归档时间: |
|
| 查看次数: |
19026 次 |
| 最近记录: |