相关疑难解决方法(0)

根据上游任务的输出在气流中生成动态任务

如何根据上游任务返回的列表动态生成任务。

我尝试了以下方法:

使用外部文件从列表中写入和读取 - 此选项有效,但我正在寻找更优雅的解决方案。

Xcom 拉进一个 subdag 工厂。这是行不通的。我能够将一个列表从上游任务传递给一个 subdag,但是 xcom 只能在 subdag 的任务内部访问,并且不能用于循环/迭代返回的列表并生成任务。例如 subdag 工厂方法。

 def subdag1(parent_dag_name, child_dag_name, default_args,**kwargs):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=default_args,
        schedule_interval="@once",
    )
    list_files='{{ task_instance.xcom_pull( dag_id="qqq",task_ids="push")}}'
    for newtask in list_files:
        BashOperator(
            task_id='%s-task-%s' % (child_dag_name,   'a'),
            default_args=default_args,
            bash_command= 'echo '+ list_files + newtask,
            dag=dag_subdag,
        )
    return dag_subdag
Run Code Online (Sandbox Code Playgroud)

airflow

7
推荐指数
0
解决办法
1499
查看次数

运行时添加到 DAG 的任务无法调度

我的想法是有一个foo生成输入列表(用户、报告、日志文件等)的任务,并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。

因此,理想情况下,我的 DAG 应如下所示: 在此处输入图片说明

这里唯一的变量是生成的任务数量。在完成所有这些任务后,我想再做一些任务,因此为每个任务启动一个新的 DAG 似乎并不合适。

这是我的代码:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1)
}

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)

foo_operator = BashOperator(
    task_id='foo',
    bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
    xcom_push=True,
    dag=dag)

def gen_nodes(**kwargs):
    ti = kwargs['ti']
    workers = json.loads(ti.xcom_pull(task_ids='foo'))

    for wid in workers:
        print("Iterating worker %s" % wid)
        op = PythonOperator(
            task_id='test_op_%s' % wid,
            python_callable=lambda: print("Dynamic task!"),
            dag=dag
        )

        op.set_downstream(bar_operator)
        op.set_upstream(dummy_op)

gen_subdag_node_op = PythonOperator(
    task_id='gen_subdag_nodes',
    python_callable=gen_nodes,
    provide_context=True,
    dag=dag
)

gen_subdag_node_op.set_upstream(foo_operator)

dummy_op …
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow airflow-scheduler

2
推荐指数
1
解决办法
1735
查看次数

动态创建任务列表

我有一个 DAG,它是通过查询 DynamoDB 的列表创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。在下面的示例中未显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用它set_upstream来强制执行依赖关系。

- airflow_home
  \- dags
    \- workflow.py
Run Code Online (Sandbox Code Playgroud)

工作流.py

def get_task_list():
    # ... query dynamodb ...

def run_task(task):
    # ... do stuff ...

dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
    t = PythonOperator(
        task_id=task['id'],
        provide_context=False,
        dag=dag,
        python_callable=run_task,
        op_args=[task]
    )
Run Code Online (Sandbox Code Playgroud)

问题是workflow.py一遍又一遍地运行(每次任务运行时?),我的get_task_list()方法受到 AWS 的限制并抛出异常。

我认为这是因为无论何时run_task()被调用,它都会运行所有全局变量,workflow.py所以我尝试run_task()进入一个单独的模块,如下所示:

- airflow_home
  \- dags
    \- workflow.py
    \- mypackage
      \- __init__
      \- task.py
Run Code Online (Sandbox Code Playgroud)

但这并没有改变任何事情。我什至尝试放入get_task_list()一个用工厂函数包装的 SubDagOperator,它的行为方式仍然相同。 …

airflow apache-airflow

2
推荐指数
1
解决办法
4398
查看次数