如何在Airflow UI上停止/终止正在运行的任务?我在用LocalExecutor.即使我使用CeleryExecutor,我怎么能杀死/停止正在运行的任务?
我使用的是LocalExecutor,我的dag有3个任务,其中任务(C)依赖于任务(A).任务(B)和任务(A)可以并行运行,如下所示
A - >Ç
乙
所以任务(A)失败了,但任务(B)运行正常.任务(C)尚未运行,因为任务(A)失败.
我的问题是我如何单独运行任务(A),因此任务(A)运行一旦任务(A)完成,并且Airflow UI将它们标记为成功.
我正在使用Airflow v1.8.1并在kubernetes和Docker上运行所有组件(worker,web,flower,scheduler).我使用Redis的Celery Executor,我的任务看起来像:
(start) -> (do_work_for_product1)
? -> (do_work_for_product2)
? -> (do_work_for_product3)
? …
Run Code Online (Sandbox Code Playgroud)
所以start任务有多个下游.我设置了与并发相关的配置如下:
parallelism = 3
dag_concurrency = 3
max_active_runs = 1
Run Code Online (Sandbox Code Playgroud)
然后,当我手动运行此DAG(不确定它是否从未在计划任务中发生)时,某些下游会执行,但其他下游仍处于"排队"状态.
如果我从Admin UI清除任务,它将被执行.没有工作日志(在处理一些第一个下游之后,它只是不输出任何日志).
Web服务器的日志(不确定worker exiting是否相关)
/usr/local/lib/python2.7/dist-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.
.format(x=modname), ExtDeprecationWarning
[2017-08-24 04:20:56,496] [51] {models.py:168} INFO - Filling up the DagBag from /usr/local/airflow_dags
[2017-08-24 04:20:57 +0000] [27] [INFO] Handling signal: ttou
[2017-08-24 04:20:57 +0000] [37] [INFO] Worker exiting (pid: 37)
Run Code Online (Sandbox Code Playgroud)
调度程序也没有错误日志.每当我尝试这个时,许多任务就会发生变化.
因为我也使用Docker,我想知道这是否相关:https: //github.com/puckel/docker-airflow/issues/94 但到目前为止,没有任何线索.
有没有人遇到类似的问题或者有一些想法我可以调查这个问题...?
有没有办法将参数传递给:
airflow trigger_dag dag_name {param}
Run Code Online (Sandbox Code Playgroud)
?
我有一个监视文件目录的脚本 - 当文件移动到目标目录时我想触发dag作为参数传递文件路径.
我的问题:
我很难理解Airflow中的BranchPythonOperator是如何工作的.我知道它主要用于分支,但是对于传递给任务的内容以及我需要从上游任务传递/期望的文档感到困惑.
鉴于本页文档中的简单示例,对于上游任务调用的源代码run_this_first和分支的2个下游任务是什么样的?Airflow如何知道运行branch_a而不是branch_b?上游任务的输出在哪里被注意到/读取?
有没有办法在Airflow中创建一个用户自定义的宏,它本身是从其他宏计算的?
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
'simple',
schedule_interval='0 21 * * *',
user_defined_macros={
'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
},
)
task = BashOperator(
task_id='bash_op',
bash_command='echo "{{ next_execution_date }}"',
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
这里的用例是将新的Airflow v1.8 next_execution_date宏反向移植到Airflow v1.7中.不幸的是,这个模板在没有宏扩展时呈现:
$ airflow render simple bash_op 2017-08-09 21:00:00
# ----------------------------------------------------------
# property: bash_command
# ----------------------------------------------------------
echo "{{ dag.following_schedule(execution_date) }}"
Run Code Online (Sandbox Code Playgroud) 我在一个名为tutorial_2.py(实际上是教程中tutorial.py提供的副本airflow,除了dag_id更改为tutorial_2)的文件中定义了一个DAG .
当我查看我的默认值,未修改airflow.cfg(位于~/airflow)时,我看到dags_folder设置为/home/alex/airflow/dags.
我知道cd /home/alex/airflow; mkdir dags; cd dags; cp [...]/tutorial_2.py tutorial_2.py.现在我有一个dags匹配路径的文件夹airflow.cfg,包含tutorial_2.py我之前创建的文件.
但是,当我运行时airflow list_dags,我只获得与默认的教程DAG相对应的名称.
我想tutorial_2在我的DAG列表中显示,以便我可以开始与之交互.既没有python tutorial_2.py也airflow resetdb没有使它出现在列表中.
我该如何解决这个问题?
我似乎不明白如何将模块导入到apache airflow DAG定义文件中.我想这样做是为了能够创建一个库,例如,使用相似的设置声明任务更简洁.
这是我能想到的最简单的例子,它复制了这个问题:我修改了气流教程(https://airflow.apache.org/tutorial.html#recap),只需导入一个模块并从该模块运行一个定义.像这样:
目录结构:
- dags/
-- __init__.py
-- lib.py
-- tutorial.py
Run Code Online (Sandbox Code Playgroud)
tutorial.py:
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# Here is my added import
from lib import print_double
# And my usage of the imported def
print_double(2)
## -- snip, because this is just the tutorial code,
## i.e., some standard DAG defintion stuff --
Run Code Online (Sandbox Code Playgroud)
print_double 只是一个简单的def,它将你给它的任何输入乘以2,并打印结果,但显然这甚至不重要,因为这是一个导入问题. …
我正在使用 Airflow 1.8.1,我想从 PostgreOperator 推送 sql 请求的结果。
这是我的任务:
check_task = PostgresOperator(
task_id='check_task',
postgres_conn_id='conx',
sql="check_task.sql",
xcom_push=True,
dag=dag)
def py_is_first_execution(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='check_task')
print 'count ----> ', value
if value == 0:
return 'next_task'
else:
return 'end-flow'
check_branch = BranchPythonOperator(
task_id='is-first-execution',
python_callable=py_is_first_execution,
provide_context=True,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
这是我的 sql 脚本:
select count(1) from table
Run Code Online (Sandbox Code Playgroud)
当我检查 xcom 值时,check_task它会检索none值。