无法在 Airflow 的 jinja 模板中使用 python 变量

bha*_*159 2 python amazon-web-services amazon-emr airflow mwaa

我正在尝试使用 Airflow 在 AWS EMR 上运行 11 步骤并遵循此代码作为参考。因为使用 EmrAddStepsOperator 和 EmrStepSensor 执行 11 个步骤会导致太多重复。所以我试图循环它。我在我的 DAG 中使用了以下代码。

step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]

# @evalcontextfilter
# def dangerous_render(context, value):
#     return Markup(Template(value).render(context)).render()

for i in range(0,len(steps)):
        #Add step
    step_adder.append(EmrAddStepsOperator(
        task_id=steps[i],
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=eval('step_'+str(i+1)),
    ))
    print(step_adder)
        #Step Sensor for checking
    step_checker.append(EmrStepSensor(
        task_id=steps[i]+'_check',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        #step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
        step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
        aws_conn_id='aws_default',
    ))
Run Code Online (Sandbox Code Playgroud)

我在这里遇到一个错误,EmrStepSensor 需要 EMR 中的 step_id 在这里输入,并且该 ID 是从 xcom 获取的生成的(我猜,我不是 100% 确定此代码是如何工作的)。但是我的步骤存储在步骤列表中,因此我无法在step_id的task_id中给出静态值,就像参考代码中给出的那样,我无法弄清楚如何使用具有python变量值的jinja模板在此处放置值从步骤列表中。

我使用了以下两种方式,以便step_id可以根据steps[i]中的步骤名称从EMR中获取正确的步骤

step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",

step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}")
Run Code Online (Sandbox Code Playgroud)

然而,这两个操作都因 Airflow 中的语法错误而失败。因此,如果有人能为我指明正确的方向,我将非常感激。我正在使用 Airflow 1.10.12(这是 AWS 上托管 Apache Airflow 中 Airflow 的默认版本)。

小智 10

我不确定这个问题是否已经解决,所以:

使用 f 字符串:

f"{{{{ task_instance.xcom_pull(task_ids='{steps[i]}', key='return_value')[0] }}}}"

使用.format"{{{{ task_instance.xcom_pull(task_ids='{}', key='return_value')[0] }}}}".format(steps[i])

请注意,您必须确保 key task_ids的值用单引号引起来。另外,xcom_pull 返回的是一个列表,因此索引 [0] 位于 o 末尾