我试图通过在 Airflow 中使用 celeryExecutor 来运行以下简单的工作流程:
default_args = {
'depends_on_past': False,
'start_date': datetime.now(),
}
dag = DAG('HelloWorld', default_args=default_args, schedule_interval=None)
default_args=default_args)
t1 = BashOperator(
task_id='task_1',
bash_command='echo "Hello World from Task 1"; sleep 0.1',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo "Hello World from Task 2"; sleep 0.2',
dag=dag)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
但是,task_1 和 task_2 之间总是有大约 5 秒的延迟。以下是airflow.cfg 快照:
[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 0.1
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 1
Run Code Online (Sandbox Code Playgroud)
看起来 celery 是导致延迟的原因之一,但是,如果为真,如何从气流配置或 API 设置 celery 工作器心跳间隔(或池率)?
小智 5
作为批处理调度程序,Airflow 目前不能保证超低延迟。该项目的目标是使大规模延迟达到亚分钟级成为可能,但在较大的环境中延迟达到几分钟是很常见的。
如果延迟大约为 1 分钟,则执行一连串 1-2 秒的任务就没有意义了。通常,Airflow 任务的持续时间应以分钟为单位,而不是以秒为单位(但也有例外)。Airflow 不是 Amazon Lambda。
可能可以微调并说 <= 5 秒,但是随着您扩展系统,提供这些保证将变得不可能。
| 归档时间: |
|
| 查看次数: |
2065 次 |
| 最近记录: |