在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.
dag会触发工作吗?如果是这样,请你举个例子.
我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)
我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTaskSensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)
我已经启动了Airflow网络服务器并安排了一些dags.我可以在Web GUI上看到这些dags.
如何删除特定DAG在Web GUI中运行和显示?是否有Airflow CLI命令来执行此操作?
我环顾四周,但是一旦加载和安排DAG,就找不到简单的删除DAG的方法.
运行pip install airflow [postgres]命令后出现以下错误:
引发RuntimeError("默认情况下,Airflow的一个依赖项安装一个GPL"
RuntimeError:默认情况下,Airflow的一个依赖项会安装GPL依赖项(unidecode).为避免此依赖关系,请在安装或升级Airflow时在您的环境中设置SLUGIFY_USES_TEXT_UNIDECODE = yes.要强制安装GPL版本,请设置AIRFLOW_GPL_UNIDECODE
我想在debian 9中安装
当我在dags文件夹中放入一个新的DAG python脚本时,我可以在DAG UI中查看DAG的新条目,但它没有自动启用.最重要的是,它似乎也没有正确加载.我只能在列表右侧点击几次"刷新"按钮,并切换列表左侧的开/关按钮,以便能够安排DAG.这些是手动过程,因为我需要触发一些东西,即使DAG脚本放在dag文件夹中.
有人可以帮我吗?我错过了什么吗?或者这是气流中的正确行为?
顺便说一句,正如帖子标题中所提到的,有一个指示符,显示"此DAG在网络服务器DagBag对象中不可用.它显示在此列表中,因为调度程序在metdata数据库中将其标记为活动"标记在我触发所有这个手动过程之前使用DAG标题.
我真的是这个论坛的新手.但是我一直在为我们的公司玩气流.对不起,这个问题听起来真的很蠢.
我正在使用一堆BashOperator编写一个管道.基本上,对于每个任务,我想简单地使用'curl'调用REST api
这是我的管道看起来像(非常简化的版本):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['xxxx@xxx.xxx'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
}
current_datetime = datetime_obj.now(tz=tz.tzlocal())
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
task_id='rest-api-1',
bash_command=curl_cmd,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
如果你注意到我在做什么current_datetime= datetime_obj.now(tz=tz.tzlocal())
而不是我想要的是'execution_date'
如何直接使用'execution_date'并将其分配给我的python文件中的变量?
我有这个访问args的一般问题.任何帮助将得到真诚的感谢.
谢谢
我是Airbnb开源工作流程/ datapipeline软件气流的新用户,它位于github URL https://github.com/apache/incubator-airflow上.启动Web UI后,有十几个默认示例dags.让我感到愚蠢,我尝试了很多方法来删除这些笨蛋,但没有这样做.load_examples = False在airflow.cfg中设置.文件夹lib/python2.7/site-packages/airflow/example_dags已删除.删除dag pythons脚本文件夹后,这些示例dags的状态将更改为灰色.但这些项目仍然占据了Web UI屏幕.并且在airflow.cfg中将新的dag文件夹指定为dags_folder =/mnt/dag/1.我检查了这个dag文件夹,什么都没有.对我来说真的很奇怪,为什么要删除这些例子是如此困难.
假设你有一个气流DAG是没有意义回填,这意味着,它的运行一次后,运行它随后的时间很快就完全没有意义的.
例如,如果您从一些仅每小时更新一次的数据库加载数据到数据库中,那么快速连续发生的回填只会一次又一次地导入相同的数据.
当您实例化一个新的每小时任务时,这尤其令人讨厌,并且它会在N
您指定的时间间隔内开始运行之前,每小时都会错过一次,执行冗余工作.
我能想到的唯一解决方案是他们在文档的常见问题解答中特别建议的
我们建议不要将动态值用作start_date,尤其是
datetime.now()
因为它可能非常混乱.
有没有办法禁用DAG的回填,或者我应该怎么做?
安装正确。db 正确启动并尝试启动网络服务器显示以下错误。
我重新安装了所有东西,但它仍然无法正常工作。
如果有人帮助我,我将不胜感激。
控制台输出:
$:~/airflow# airflow webserver -p 8080
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2020-04-08 13:14:20,573] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-04-08 13:14:20,574] {dagbag.py:403} INFO - Filling up the DagBag from /home/cato_service/airflow/dags
Traceback (most recent call last):
File "/usr/local/bin/airflow", …
Run Code Online (Sandbox Code Playgroud) 当有任务正在运行时,Airflow 会弹出一个通知,说调度程序似乎没有运行,并且一直显示直到任务完成:
The scheduler does not appear to be running. Last heartbeat was received 5 minutes ago.
The DAGs list may not update, and new tasks will not be scheduled.
Run Code Online (Sandbox Code Playgroud)
实际上,调度程序进程正在运行,因为我已经检查了该进程。任务完成后,通知消失,一切恢复正常。
我的任务有点重,可能要运行几个小时。