在 Airflow 的 EmailOperator 中访问 Xcom

wal*_*r91 3 python jinja2 google-cloud-platform airflow

我对 Airflow 很陌生,我在 Xcom 和 Jinja 方面遇到了一些问题。

我必须做一些 Python 详细说明,然后将结果传递给 EmailOperator,以便将其作为电子邮件正文发送。

似乎没有关于它的文档,我发现的唯一提示是这个链接,它的格式很糟糕,不起作用,而且评论很差。我认为它必须通过 Jinja 完成,但我不知道如何......

这是我正在研究的暂定,有人可以帮助我并解释为什么这段代码是错误的以及如何修复它吗?

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 11, 7),
    'email': ['ciccio.pasticcio@noreply.it'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'dg_daily_saint',
    default_args=default_args,
    schedule_interval='10 9 * * *')

task1 = PythonOperator(task_id="extract_daily_saint",
                                     python_callable=extractDailySaint,
                                     provide_context=True,
                                     dag=dag)


def html_output(**context):
    value=context['task_instance'].xcom_pull(task_ids='extract_daily_saint', key='saint')
    return "<h1>" + value + "</h1>"


EMAIL_CONTENT = """
    <b> {{ html_output(context) }}</b>
    """

mail = EmailOperator(
    task_id='mail',
    to='ciccio.pasticcio@noreply.it',
    subject='DataGovernance',
    html_content=EMAIL_CONTENT,
    provide_context=True,
    dag=dag)

task1 >> mail
Run Code Online (Sandbox Code Playgroud)

Emm*_*mma 7

被卡住可能是另一个问题,但我看到关于推杆和拉杆的一些困惑。

Pusher 是将参数推送给另一个操作符的操作符。推手需要xcom_push=True。对于PythonOperator,将推送返回值。

Puller 是操作者从 Pusher 接收参数。拉拔器需求provide_context=True

因此,在你的例子中

task1 = PythonOperator(task_id="extract_daily_saint",
                       python_callable=extractDailySaint,
                       xcom_push=True,  # not provide_context
                       dag=dag)
Run Code Online (Sandbox Code Playgroud)

然后在你的 puller 中,你可以直接在 Jinja 模板中使用宏。

mail = EmailOperator(
    task_id='mail',
    to='ciccio.pasticcio@noreply.it',
    subject='DataGovernance',
    html_content="<b><h1> {{ task_instance.xcom_pull(task_ids='extract_daily_saint') }} </h1></b>",
    provide_context=True,  # puller needs provide_context
    dag=dag)
Run Code Online (Sandbox Code Playgroud)