wal*_*r91 3 python jinja2 google-cloud-platform airflow
我对 Airflow 很陌生,我在 Xcom 和 Jinja 方面遇到了一些问题。
我必须做一些 Python 详细说明,然后将结果传递给 EmailOperator,以便将其作为电子邮件正文发送。
似乎没有关于它的文档,我发现的唯一提示是这个链接,它的格式很糟糕,不起作用,而且评论很差。我认为它必须通过 Jinja 完成,但我不知道如何......
这是我正在研究的暂定,有人可以帮助我并解释为什么这段代码是错误的以及如何修复它吗?
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 11, 7),
'email': ['ciccio.pasticcio@noreply.it'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'dg_daily_saint',
default_args=default_args,
schedule_interval='10 9 * * *')
task1 = PythonOperator(task_id="extract_daily_saint",
python_callable=extractDailySaint,
provide_context=True,
dag=dag)
def html_output(**context):
value=context['task_instance'].xcom_pull(task_ids='extract_daily_saint', key='saint')
return "<h1>" + value + "</h1>"
EMAIL_CONTENT = """
<b> {{ html_output(context) }}</b>
"""
mail = EmailOperator(
task_id='mail',
to='ciccio.pasticcio@noreply.it',
subject='DataGovernance',
html_content=EMAIL_CONTENT,
provide_context=True,
dag=dag)
task1 >> mail
Run Code Online (Sandbox Code Playgroud)
被卡住可能是另一个问题,但我看到关于推杆和拉杆的一些困惑。
Pusher 是将参数推送给另一个操作符的操作符。推手需要xcom_push=True。对于PythonOperator,将推送返回值。
Puller 是操作者从 Pusher 接收参数。拉拔器需求provide_context=True
因此,在你的例子中
task1 = PythonOperator(task_id="extract_daily_saint",
python_callable=extractDailySaint,
xcom_push=True, # not provide_context
dag=dag)
Run Code Online (Sandbox Code Playgroud)
然后在你的 puller 中,你可以直接在 Jinja 模板中使用宏。
mail = EmailOperator(
task_id='mail',
to='ciccio.pasticcio@noreply.it',
subject='DataGovernance',
html_content="<b><h1> {{ task_instance.xcom_pull(task_ids='extract_daily_saint') }} </h1></b>",
provide_context=True, # puller needs provide_context
dag=dag)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1644 次 |
| 最近记录: |