标签: apache-airflow

如何从UI停止/终止Airflow任务

如何在Airflow UI上停止/终止正在运行的任务?我在用LocalExecutor.即使我使用CeleryExecutor,我怎么能杀死/停止正在运行的任务?

python apache hadoop airflow apache-airflow

29
推荐指数
5
解决办法
3万
查看次数

如何在Airflow上重新启动失败的任务

我使用的是LocalExecutor,我的dag有3个任务,其中任务(C)依赖于任务(A).任务(B)和任务(A)可以并行运行,如下所示

A - >Ç

所以任务(A)失败了,但任务(B)运行正常.任务(C)尚未运行,因为任务(A)失败.

我的问题是我如何单独运行任务(A),因此任务(A)运行一旦任务(A)完成,并且Airflow UI将它们标记为成功.

python hadoop bigdata airflow apache-airflow

25
推荐指数
2
解决办法
1万
查看次数

气流任务陷入"排队"状态,永远不会运行

我正在使用Airflow v1.8.1并在kubernetes和Docker上运行所有组件(worker,web,flower,scheduler).我使用Redis的Celery Executor,我的任务看起来像:

(start) -> (do_work_for_product1)
     ?  -> (do_work_for_product2)
     ?  -> (do_work_for_product3)
     ?  …
Run Code Online (Sandbox Code Playgroud)

所以start任务有多个下游.我设置了与并发相关的配置如下:

parallelism = 3
dag_concurrency = 3
max_active_runs = 1
Run Code Online (Sandbox Code Playgroud)

然后,当我手动运行此DAG(不确定它是否从未在计划任务中发生)时,某些下游会执行,但其他下游仍处于"排队"状态.

如果我从Admin UI清除任务,它将被执行.没有工作日志(在处理一些第一个下游之后,它只是不输出任何日志).

Web服务器的日志(不确定worker exiting是否相关)

/usr/local/lib/python2.7/dist-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.
  .format(x=modname), ExtDeprecationWarning
[2017-08-24 04:20:56,496] [51] {models.py:168} INFO - Filling up the DagBag from /usr/local/airflow_dags
[2017-08-24 04:20:57 +0000] [27] [INFO] Handling signal: ttou
[2017-08-24 04:20:57 +0000] [37] [INFO] Worker exiting (pid: 37)
Run Code Online (Sandbox Code Playgroud)

调度程序也没有错误日志.每当我尝试这个时,许多任务就会发生变化.

因为我也使用Docker,我想知道这是否相关:https: //github.com/puckel/docker-airflow/issues/94 但到目前为止,没有任何线索.

有没有人遇到类似的问题或者有一些想法我可以调查这个问题...?

airflow apache-airflow airflow-scheduler

21
推荐指数
1
解决办法
1万
查看次数

来自cli的气流传递参数

有没有办法将参数传递给:

airflow trigger_dag dag_name {param}
Run Code Online (Sandbox Code Playgroud)

我有一个监视文件目录的脚本 - 当文件移动到目标目录时我想触发dag作为参数传递文件路径.

airflow apache-airflow airflow-scheduler

16
推荐指数
2
解决办法
1万
查看次数

Dags和任务的气流结构/组织

我的问题:

airflow apache-airflow

16
推荐指数
2
解决办法
5104
查看次数

Airflow的BranchPythonOperator如何工作?

我很难理解Airflow中的BranchPythonOperator是如何工作的.我知道它主要用于分支,但是对于传递给任务的内容以及我需要从上游任务传递/期望的文档感到困惑.

鉴于本页文档中的简单示例,对于上游任务调用的源代码run_this_first和分支的2个下游任务是什么样的?Airflow如何知道运行branch_a而不是branch_b?上游任务的输出在哪里被注意到/读取?

airflow apache-airflow

16
推荐指数
1
解决办法
1万
查看次数

使自定义Airflow宏扩展其他宏

有没有办法在Airflow中创建一个用户自定义的宏,它本身是从其他宏计算的?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date }}"',
    dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

这里的用例是将新的Airflow v1.8 next_execution_date宏反向移植到Airflow v1.7中.不幸的是,这个模板在没有宏扩展时呈现:

$ airflow render simple bash_op 2017-08-09 21:00:00
    # ----------------------------------------------------------
    # property: bash_command
    # ----------------------------------------------------------
    echo "{{ dag.following_schedule(execution_date) }}"
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow

16
推荐指数
3
解决办法
6262
查看次数

如何向Airflow添加新的DAG?

我在一个名为tutorial_2.py(实际上是教程中tutorial.py提供的副本airflow,除了dag_id更改为tutorial_2)的文件中定义了一个DAG .

当我查看我的默认值,未修改airflow.cfg(位于~/airflow)时,我看到dags_folder设置为/home/alex/airflow/dags.

我知道cd /home/alex/airflow; mkdir dags; cd dags; cp [...]/tutorial_2.py tutorial_2.py.现在我有一个dags匹配路径的文件夹airflow.cfg,包含tutorial_2.py我之前创建的文件.

但是,当我运行时airflow list_dags,我只获得与默认的教程DAG相对应的名称.

我想tutorial_2在我的DAG列表中显示,以便我可以开始与之交互.既没有python tutorial_2.pyairflow resetdb没有使它出现在列表中.

我该如何解决这个问题?

python airflow apache-airflow

16
推荐指数
3
解决办法
1万
查看次数

Apache Airflow DAG无法导入本地模块

我似乎不明白如何将模块导入到apache airflow DAG定义文件中.我想这样做是为了能够创建一个库,例如,使用相似的设置声明任务更简洁.

这是我能想到的最简单的例子,它复制了这个问题:我修改了气流教程(https://airflow.apache.org/tutorial.html#recap),只需导入一个模块并从该模块运行一个定义.像这样:

目录结构:

- dags/
-- __init__.py
-- lib.py
-- tutorial.py
Run Code Online (Sandbox Code Playgroud)

tutorial.py:

"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

# Here is my added import
from lib import print_double

# And my usage of the imported def
print_double(2)

## -- snip, because this is just the tutorial code, 
## i.e., some standard DAG defintion stuff --
Run Code Online (Sandbox Code Playgroud)

print_double 只是一个简单的def,它将你给它的任何输入乘以2,并打印结果,但显然这甚至不重要,因为这是一个导入问题. …

python airflow apache-airflow

16
推荐指数
2
解决办法
9977
查看次数

气流:如何从 PostgreOperator 推送 xcom 值?

我正在使用 Airflow 1.8.1,我想从 PostgreOperator 推送 sql 请求的结果。

这是我的任务:

check_task = PostgresOperator(
    task_id='check_task',
    postgres_conn_id='conx',
    sql="check_task.sql",
    xcom_push=True,
    dag=dag)

def py_is_first_execution(**kwargs):
    value = kwargs['ti'].xcom_pull(task_ids='check_task')
    print 'count ----> ', value
    if value == 0:
       return 'next_task'
    else:
       return 'end-flow'

check_branch = BranchPythonOperator(
    task_id='is-first-execution',
    python_callable=py_is_first_execution,
    provide_context=True,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

这是我的 sql 脚本:

select count(1) from table
Run Code Online (Sandbox Code Playgroud)

当我检查 xcom 值时,check_task它会检索none值。

python airflow apache-airflow airflow-scheduler

15
推荐指数
2
解决办法
7560
查看次数