在这个答案的帮助下/sf/answers/2921135731/我正在执行一个python文件.
我使用PythonOperator并尝试将执行日期作为传递给脚本的参数包含在内.
我相信我可以通过kwargs ['execution_date']以某种方式访问它.
以下失败
DAG.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import sys
import os
sys.path.append(os.path.abspath("/home/glsam/OmegaAPI/airflow/scripts/PyPer_ogi_simple"))
from update_benchmarks import *
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 4, 23),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('run_pyPer', default_args=default_args)
update_BM_G027 = PythonOperator(
task_id='update_BM_G027',
python_callable=update_bmk,
dag=dag,
op_kwargs={
'bmk_code': 'G027',
'is_hedged': False,
'from_date': kwargs['execution_date'],
})
Run Code Online (Sandbox Code Playgroud)
也许我需要使用这个答案来获取日期然后XCOM它的任务?/sf/answers/2572845131/
tob*_*bi6 16
这确实有点令人困惑,并没有很好的记录.
你已经在使用了PythonOperator
.
现在只需添加选项
provide_context=True,
Run Code Online (Sandbox Code Playgroud)
并用指针扩展你的callable,例如
update_bmk(bmk_code, is_hedged, **context)
Run Code Online (Sandbox Code Playgroud)
现在,在您的函数中,您将可以访问有关任务的所有信息,包括执行日期,如下所示:
task_instance = context['task_instance']
execution_date = context['execution_date']
Run Code Online (Sandbox Code Playgroud)
要查看上下文中项目的完整参考,请参阅https://airflow.apache.org/code.html#macros
这些是宏的文档,但您可以使用上下文字典中的项目.
归档时间: |
|
查看次数: |
7698 次 |
最近记录: |