在气流中创建子标签时访问父 dag 上下文?

rus*_*ro1 6 airflow apache-airflow-xcom

我试图在 subdag 创建时访问来自父 dag 的一些 xcom 数据,我正在寻找在互联网上实现这一目标,但我没有找到任何东西。

def test(task_id):
    logging.info(f' execution of task {task_id}')


def load_subdag(parent_dag_id, child_dag_id, args):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
        default_args=args,
        schedule_interval="@daily",
    )
    with dag_subdag:
        r = DummyOperator(task_id='random')

        for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
            t = PythonOperator(
                task_id='load_subdag_{0}'.format(i),
                default_args=args,
                python_callable=print_context,
                op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
                dag=dag_subdag,
            )

    return dag_subdag

load_tasks = SubDagOperator(
        task_id='load_tasks',
        subdag=load_subdag(dag.dag_id,
                           'load_tasks', args),
        default_args=args,
    )
Run Code Online (Sandbox Code Playgroud)

我的代码出现此错误

1  | Traceback (most recent call last):
airflow_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1  |     m = imp.load_source(mod_name, filepath)
airflow_1  |   File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
airflow_1  |     module = _load(spec)
airflow_1  |   File "<frozen importlib._bootstrap>", line 684, in _load
airflow_1  |   File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
airflow_1  |   File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow_1  |   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 75, in <module>
airflow_1  |     'load_tasks', args),
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 55, in load_subdag
airflow_1  |     for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
airflow_1  | TypeError: xcom_pull() missing 1 required positional argument: 'context'
Run Code Online (Sandbox Code Playgroud)

y2k*_*ham 8

错误很简单:您缺少方法context所需的参数xcom_pull()。但是你真的不能仅仅context通过create传入这个方法;它是一个传递给锚方法的Python 字典Airflowpre_execute()andexecute() of BaseOperator(所有Operators 的父类)。

换句话说,只有在实际执行时才context可用,而不是在-definition期间。这是有道理的,因为在s 的分类中,s 是 s 之间的实时通信机制:它们在运行时相互交谈。OperatorDAGAirflowxcomtask


但在一天结束时Xcom,就像所有其他Airflow模型一样,都保留在后端 meta-db 中。所以当然你可以直接从那里检索它(显然只有task过去运行过的 s的 XCOM )。虽然我没有代码片段,但您可以看看cli.py他们在哪里使用SQLAlchemyORM 来处理模型和后端数据库。请理解,这意味着每次DAG解析 -definition 文件时都会向您的后端数据库触发查询,这发生得相当快。


有用的链接


编辑-1

看了你的代码片段后,我感到震惊。假设返回的值xcom_pull()会经常变化,tasksdag数量也会不断变化。这可能会导致不可预测的行为(你应该做一些研究,但我对此感觉不太好)

我建议你重新审视你的整个任务流程和凝结下来的设计,其中-数taskS和-的结构DAG 是事先已知(在当时的时间执行DAG定义文件)。您当然可以迭代查询的json文件/结果SQL(如SQLAlchemy前面提到的事情)等以生成您的实际tasks,但该文件/db/任何不应经常更改。


请理解,仅仅迭代一个列表来生成tasks 是没有问题的;不可能有DAG依赖于结果的结构upstream task。例如,您不能根据上游任务在运行时计算 n 的值task在您的DAG基础上创建n 。


所以这是不可能的

但这是可能的(包括您想要实现的目标;即使您这样做的方式似乎不是一个好主意)


编辑-2

事实证明,从上游任务的输出生成任务毕竟是可能的;尽管它需要大量有关 Airflow 内部工作原理的知识以及一点创造力。