小编bha*_*159的帖子

无法在 Airflow 的 jinja 模板中使用 python 变量

我正在尝试使用 Airflow 在 AWS EMR 上运行 11 步骤并遵循此代码作为参考。因为使用 EmrAddStepsOperator 和 EmrStepSensor 执行 11 个步骤会导致太多重复。所以我试图循环它。我在我的 DAG 中使用了以下代码。

step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]

# @evalcontextfilter
# def dangerous_render(context, value):
#     return Markup(Template(value).render(context)).render()

for i in range(0,len(steps)):
        #Add step
    step_adder.append(EmrAddStepsOperator(
        task_id=steps[i],
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=eval('step_'+str(i+1)),
    ))
    print(step_adder)
        #Step Sensor for checking
    step_checker.append(EmrStepSensor(
        task_id=steps[i]+'_check',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        #step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
        step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
        aws_conn_id='aws_default',
    ))
Run Code Online (Sandbox Code Playgroud)

我在这里遇到一个错误,EmrStepSensor 需要 …

python amazon-web-services amazon-emr airflow mwaa

2
推荐指数
1
解决办法
3376
查看次数

标签 统计

airflow ×1

amazon-emr ×1

amazon-web-services ×1

mwaa ×1

python ×1