当有任务正在运行时,Airflow 会弹出一个通知,说调度程序似乎没有运行,并且一直显示直到任务完成:
The scheduler does not appear to be running. Last heartbeat was received 5 minutes ago.
The DAGs list may not update, and new tasks will not be scheduled.
Run Code Online (Sandbox Code Playgroud)
实际上,调度程序进程正在运行,因为我已经检查了该进程。任务完成后,通知消失,一切恢复正常。
我的任务有点重,可能要运行几个小时。
我正在使用Airflow v1.8.1并在kubernetes和Docker上运行所有组件(worker,web,flower,scheduler).我使用Redis的Celery Executor,我的任务看起来像:
(start) -> (do_work_for_product1)
? -> (do_work_for_product2)
? -> (do_work_for_product3)
? …
Run Code Online (Sandbox Code Playgroud)
所以start任务有多个下游.我设置了与并发相关的配置如下:
parallelism = 3
dag_concurrency = 3
max_active_runs = 1
Run Code Online (Sandbox Code Playgroud)
然后,当我手动运行此DAG(不确定它是否从未在计划任务中发生)时,某些下游会执行,但其他下游仍处于"排队"状态.
如果我从Admin UI清除任务,它将被执行.没有工作日志(在处理一些第一个下游之后,它只是不输出任何日志).
Web服务器的日志(不确定worker exiting是否相关)
/usr/local/lib/python2.7/dist-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.
.format(x=modname), ExtDeprecationWarning
[2017-08-24 04:20:56,496] [51] {models.py:168} INFO - Filling up the DagBag from /usr/local/airflow_dags
[2017-08-24 04:20:57 +0000] [27] [INFO] Handling signal: ttou
[2017-08-24 04:20:57 +0000] [37] [INFO] Worker exiting (pid: 37)
Run Code Online (Sandbox Code Playgroud)
调度程序也没有错误日志.每当我尝试这个时,许多任务就会发生变化.
因为我也使用Docker,我想知道这是否相关:https: //github.com/puckel/docker-airflow/issues/94 但到目前为止,没有任何线索.
有没有人遇到类似的问题或者有一些想法我可以调查这个问题...?
我使用气流进行Spark作业的工作流程.安装后,我将DAG文件复制到airflow.cfg中设置的DAGs文件夹中.我可以回填DAG以成功运行BashOperators.但总有一个像上面提到的警告.我没有验证调度是否正常,但我怀疑调度可以工作,因为警告说主调度程序不知道我的DAG存在.如何消除此警告并获得调度工作?有人遇到同样的问题可以帮助我吗?
我的气流调度程序中有两个 DAG,它们过去一直在工作。在需要重建运行气流的 docker 容器后,它们现在陷入了困境queued。在我的例子中,DAG 是通过 REST API 触发的,因此不涉及实际的调度。
由于有很多类似的帖子,我从一个类似的问题中浏览了这个答案的清单:
是的!
是的!
是的,两个 DAGS 都显示在 WebUI 中,并且没有显示任何错误。
是的,两个 DAG 的构造函数如下所示:
dag = DAG(
dag_id='image_object_detection_dag',
default_args=args,
schedule_interval=None,
start_date=days_ago(2),
tags=['helloworld'],
)
Run Code Online (Sandbox Code Playgroud)
不,我通过 REST API 手动触发我的 DAG。
查找任务无法退出状态queued并运行的原因的最佳方法是什么?
编辑:
出于好奇,我尝试从 WebUI 内触发 DAG,现在两个运行都已执行(从 WebUI 触发的一个失败,但这是预期的,因为没有设置配置)

我正在尝试使用BigQueryOperator。我以为以后会使用google composer,但我希望它先在本地运行。我可以顺畅运行并且运行BashOperator良好,也可以airflow test <dag> <task>在task要运行的大查询任务所在的位置运行,但是当我从UI触发DAG时,bigquery任务永远不会排队。相反,它们具有REMOVED状态,什么也没有发生。
我的DAG定义如下:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
yesterday = datetime.combine(
datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'tutorial', default_args=default_args) as dag:
operators
t1 = BashOperator(
task_id='print_date',
bash_command='date')
template_sql = '''
SELECT 'TOMAS' name, '{{ params.my_value …Run Code Online (Sandbox Code Playgroud) 前言
另一个气流任务没有得到执行的问题...
直到这个周末,一切都变得顺风顺水,直到今天这个周末,我的气流体验还是差不多。
我已经检查了所有标准的东西,例如这篇有用的文章中概述的内容。
我已多次重设整个实例,以使其正常工作,但我在这里完全失去了战斗。
环境
问题
这是我对无限循环/重复发生的噩梦进行故障排除时发生的事情。
在我开始遇到麻烦之前,清除任务实例后,始终会非常迅速地将其拾取并再次执行。
但是现在,清除任务实例通常会导致任务实例陷入清除状态。它只是坐在那里。
更糟糕的是,如果我尝试使dag和所有实例失败,然后再次手动触发dag,则会创建任务实例,但仍处于“无”状态。重新启动调度程序无济于事。
其他观察
这可能是一条红色的鲱鱼,但我最近才注意到的一件事是,当我单击代表处于“无”状态的任务实例的图标时,它将带我到错误的“任务实例”视图过滤器过滤; 过滤器设置为“字符串等于空”。
但是您需要将其切换为“ string empty yes”,以使其实际上返回被卡住的任务实例。
我以为这只是一个不相关的UI错误,就我而言,这是一个红色的鲱鱼,但是我以为以防万一。
编辑1
编辑2
null任务实例状态是否为有效值?还是这表明有什么问题。
编辑3
更多的none东西。
以下是任务实例详细信息页面中的一些内容。许多属性是none:
Task Instance Details
Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Unknown All dependencies are met but the task instance is not running. In most cases this just means that the …Run Code Online (Sandbox Code Playgroud) 我创建了一个dag,并每天进行调度。它每天都会排队,但任务实际上并未运行。这个问题过去曾在这里提出过,但是答案并没有帮助我,因此似乎还有另一个问题。
我的代码在下面共享。我用注释替换了任务t2的SQL。当我使用“ airflow test ...”在CLI上分别运行它们时,每个任务都成功运行。
您能解释一下如何使DAG运行吗?谢谢!
这是DAG代码:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner' : 'me',
'depends_on_past' : 'true',
'start_date' : datetime(2018, 06, 25),
'email' : ['myemail@moovit.com'],
'email_on_failure':True,
'email_on_retry':False,
'retries' : 2,
'retry_delay' : timedelta(minutes=5)
}
dag = DAG('my_agg_table',
default_args = default_args,
schedule_interval = "30 4 * * *"
)
t1 = BigQueryOperator(
task_id='bq_delete_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql='''
delete `my_project.agg.my_agg_table`
where date = '{{ macros.ds_add(ds, -1)}}' …Run Code Online (Sandbox Code Playgroud)