相关疑难解决方法(0)

在Airflow中创建动态工作流的正确方法

问题

在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.

dag会触发工作吗?如果是这样,请你举个例子.

我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)

想法#1

我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTask​​Sensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)

编辑1: …

python workflow airflow

66
推荐指数
8
解决办法
2万
查看次数

运行时的气流动态任务

关于"动态任务"的其他问题似乎涉及在计划或设计时动态构建DAG.我有兴趣在执行期间动态地将任务添加到DAG.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG('test_dag', description='a test',
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 1, 1),
          catchup=False)

def make_tasks():
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1 >> du2 >> du3

p = PythonOperator(
    task_id='python_operator',
    dag=dag,
    python_callable=make_tasks)
Run Code Online (Sandbox Code Playgroud)

这种天真的实现似乎不起作用 - 虚拟任务永远不会出现在UI中.

在执行期间向DAG添加新运算符的正确方法是什么?可能吗?

python airflow airflow-scheduler

17
推荐指数
1
解决办法
3875
查看次数

动态创建任务列表

我有一个 DAG,它是通过查询 DynamoDB 的列表创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。在下面的示例中未显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用它set_upstream来强制执行依赖关系。

- airflow_home
  \- dags
    \- workflow.py
Run Code Online (Sandbox Code Playgroud)

工作流.py

def get_task_list():
    # ... query dynamodb ...

def run_task(task):
    # ... do stuff ...

dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
    t = PythonOperator(
        task_id=task['id'],
        provide_context=False,
        dag=dag,
        python_callable=run_task,
        op_args=[task]
    )
Run Code Online (Sandbox Code Playgroud)

问题是workflow.py一遍又一遍地运行(每次任务运行时?),我的get_task_list()方法受到 AWS 的限制并抛出异常。

我认为这是因为无论何时run_task()被调用,它都会运行所有全局变量,workflow.py所以我尝试run_task()进入一个单独的模块,如下所示:

- airflow_home
  \- dags
    \- workflow.py
    \- mypackage
      \- __init__
      \- task.py
Run Code Online (Sandbox Code Playgroud)

但这并没有改变任何事情。我什至尝试放入get_task_list()一个用工厂函数包装的 SubDagOperator,它的行为方式仍然相同。 …

airflow apache-airflow

2
推荐指数
1
解决办法
4398
查看次数

Airflow:如何获取一个任务的返回输出来设置下游任务运行的依赖关系?

我们有一个 kubernetes pod 操作符,它将输出一个 Python 字典,该字典将定义哪些更进一步的下游 kubernetes pod 操作符与它们的依赖项和环境变量一起运行,以传递给每个操作符。

我如何让这个 python 字典对象回到执行器的上下文(或者它是工作器的上下文?),以便气流可以产生下游 kubernetes 操作符?

我看过 BranchOperator 和 TriggerDagRunOperator 和 XCOM push/pull 和 Variable.get 和 Variable.set,但似乎没有什么工作。

airflow apache-airflow-xcom

2
推荐指数
1
解决办法
1199
查看次数