标签: airflow

在Airflow中创建动态工作流的正确方法

问题

在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.

dag会触发工作吗?如果是这样,请你举个例子.

我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)

想法#1

我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTask​​Sensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)

编辑1: …

python workflow airflow

66
推荐指数
8
解决办法
2万
查看次数

Airflow:如何删除DAG?

我已经启动了Airflow网络服务器并安排了一些dags.我可以在Web GUI上看到这些dags.

如何删除特定DAG在Web GUI中运行和显示?是否有Airflow CLI命令来执行此操作?

我环顾四周,但是一旦加载和安排DAG,就找不到简单的删除DAG的方法.

airflow

60
推荐指数
7
解决办法
4万
查看次数

安装气流时出错:默认情况下,Airflow的一个依赖项会安装GPL

运行pip install airflow [postgres]命令后出现以下错误:

引发RuntimeError("默认情况下,Airflow的一个依赖项安装一个GPL"

RuntimeError:默认情况下,Airflow的一个依赖项会安装GPL依赖项(unidecode).为避免此依赖关系,请在安装或升级Airflow时在您的环境中设置SLUGIFY_USES_TEXT_UNIDECODE = yes.要强制安装GPL版本,请设置AIRFLOW_GPL_UNIDECODE

我想在debian 9中安装

airflow

54
推荐指数
3
解决办法
2万
查看次数

气流"此DAG在网络服务器DagBag对象中不可用"

当我在dags文件夹中放入一个新的DAG python脚本时,我可以在DAG UI中查看DAG的新条目,但它没有自动启用.最重要的是,它似乎也没有正确加载.我只能在列表右侧点击几次"刷新"按钮,并切换列表左侧的开/关按钮,以便能够安排DAG.这些是手动过程,因为我需要触发一些东西,即使DAG脚本放在dag文件夹中.

有人可以帮我吗?我错过了什么吗?或者这是气流中的正确行为?

顺便说一句,正如帖子标题中所提到的,有一个指示符,显示"此DAG在网络服务器DagBag对象中不可用.它显示在此列表中,因为调度程序在metdata数据库中将其标记为活动"标记在我触发所有这个手动过程之前使用DAG标题.

python apache workflow airflow

46
推荐指数
3
解决办法
2万
查看次数

airflow中的execution_date:需要作为变量访问

我真的是这个论坛的新手.但是我一直在为我们的公司玩气流.对不起,这个问题听起来真的很蠢.

我正在使用一堆BashOperator编写一个管道.基本上,对于每个任务,我想简单地使用'curl'调用REST api

这是我的管道看起来像(非常简化的版本):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

如果你注意到我在做什么current_datetime= datetime_obj.now(tz=tz.tzlocal()) 而不是我想要的是'execution_date'

如何直接使用'execution_date'并将其分配给我的python文件中的变量?

我有这个访问args的一般问题.任何帮助将得到真诚的感谢.

谢谢

airflow

45
推荐指数
5
解决办法
5万
查看次数

如何删除气流中的默认示例dags

我是Airbnb开源工作流程/ datapipeline软件气流的新用户,它位于github URL https://github.com/apache/incubator-airflow上.启动Web UI后,有十几个默认示例dags.让我感到愚蠢,我尝试了很多方法来删除这些笨蛋,但没有这样做.load_examples = False在airflow.cfg中设置.文件夹lib/python2.7/site-packages/airflow/example_dags已删除.删除dag pythons脚本文件夹后,这些示例dags的状态将更改为灰色.但这些项目仍然占据了Web UI屏幕.并且在airflow.cfg中将新的dag文件夹指定为dags_folder =/mnt/dag/1.我检查了这个dag文件夹,什么都没有.对我来说真的很奇怪,为什么要删除这些例子是如此困难.

python airflow

45
推荐指数
5
解决办法
1万
查看次数

如何防止气流回填dag运行?

假设你有一个气流DAG是没有意义回填,这意味着,它的运行一次后,运行它随后的时间很快就完全没有意义的.

例如,如果您从一些仅每小时更新一次的数据库加载数据到数据库中,那么快速连续发生的回填只会一次又一次地导入相同的数据.

当您实例化一个新的每小时任务时,这尤其令人讨厌,并且它会在N您指定的时间间隔内开始运行之前,每小时都会错过一次,执行冗余工作.

我能想到的唯一解决方案是他们在文档的常见问题解答中特别建议

我们建议不要将动态值用作start_date,尤其是datetime.now()因为它可能非常混乱.

有没有办法禁用DAG的回填,或者我应该怎么做?

python scheduled-tasks airflow

44
推荐指数
3
解决办法
2万
查看次数

如何在Airflow中创建条件任务

我想在Airflow中创建一个条件任务,如下面的架构中所述.预期的情况如下:

  • 任务1执行
  • 如果任务1成功,则执行任务2a
  • 否则,如果任务1失败,则执行任务2b
  • 最后执行任务3

条件任务 上面的所有任务都是SSHExecuteOperator.我猜我应该使用ShortCircuitOperator和/或XCom来管理这个条件,但我不知道如何实现它.你能描述一下解决方案吗?

python conditional-statements airflow

43
推荐指数
2
解决办法
2万
查看次数

启动 Airflow 网络服务器失败并显示 sqlalchemy.exc.NoInspectionAvailable:没有可用的检查系统

安装正确。db 正确启动并尝试启动网络服务器显示以下错误。

我重新安装了所有东西,但它仍然无法正常工作。

如果有人帮助我,我将不胜感激。

控制台输出:

$:~/airflow# airflow webserver -p 8080
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-04-08 13:14:20,573] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-04-08 13:14:20,574] {dagbag.py:403} INFO - Filling up the DagBag from /home/cato_service/airflow/dags
Traceback (most recent call last):
  File "/usr/local/bin/airflow", …
Run Code Online (Sandbox Code Playgroud)

airflow

38
推荐指数
2
解决办法
6880
查看次数

执行任务后,气流调度程序似乎没有运行

当有任务正在运行时,Airflow 会弹出一个通知,说调度程序似乎没有运行,并且一直显示直到任务完成:

The scheduler does not appear to be running. Last heartbeat was received 5 minutes ago.

The DAGs list may not update, and new tasks will not be scheduled.
Run Code Online (Sandbox Code Playgroud)

实际上,调度程序进程正在运行,因为我已经检查了该进程。任务完成后,通知消失,一切恢复正常。

我的任务有点重,可能要运行几个小时。

airflow

37
推荐指数
6
解决办法
4万
查看次数