我需要能够访问在 Python 运算符 python_callable 中定义为 DAG 定义的一部分的 default_args。也许这是我对 python 或气流的不熟悉,但有人可以指导如何实现这一点。
以下是我试图实现的代码示例
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': 'xyz@xyz.com',
'email_on_failure': 'xyz@xyz.com',
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2017, 5, 15, 23, 20),
'end_date': datetime(2017, 5, 16, 23, 45),
'touchfile_path': '/user/myname/touchfiles/',
}
dag = DAG(
'test',
default_args=default_args,
template_searchpath=['/Users/myname/Desktop/utils/airflow/resources'],
user_defined_macros=dict(SCHEMA_NAME='abc'),
#schedule_interval='*/2 * * * * ')
schedule_interval='@once')
def webhdfs_touchfile_create(ds, *args, **kwargs):
web_hdfs_hook = WebHDFSHook('webhdfs_default')
client = web_hdfs_hook.get_conn()
client.write("/user/myname/airflow_hdfs","stringToWrite")
pp.pprint(kwargs)
task1 = PythonOperator(
task_id='task1',
provide_context=True, #enabling this would allow to pass arguments automatically to your callable function
python_callable=webhdfs_touchfile_create,
templates_dict={'attr1': {{ default_args['touchfile_path'] }}},
dag=dag)
Run Code Online (Sandbox Code Playgroud)
由于 PythonOperator 的 template_dict 是 jinja 模板工作的唯一属性,我如何在那里检索“ touchfile_path ”参数?
在 Airflow 中有两种传递变量的机制:
使用 (1) 方法变量可以通过DAG级别的user_defined_macros属性传递。使用 (2) 方法,您应该查看特定的运算符属性。
请注意,某些运算符属性由 Jinja 处理,您可以使用模板语法。
这是一个工作示例:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'custom_key1': 'custom_value1',
'custom_key2': 'custom_value2'
}
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
user_defined_macros=default_args ,
)
bash_command = """
echo "access via DAG's user_defined_macros = {{ custom_key1 }}"
echo "access via Operator's params = {{ params.custom_key2 }}"
"""
t1 = BashOperator(
task_id='print_in_bash_op',
bash_command=bash_command,
params=default_args,
dag=dag,
)
def myfunc(**context):
print(context['templates_dict']['custom_key1'])
print(context['templates_dict']['custom_key2'])
t2 = PythonOperator(
task_id='print_in_python_op',
python_callable=myfunc,
templates_dict=default_args,
provide_context=True,
dag=dag,
)
templates_dict={
'custom_key1': '{{ custom_key1 }}',
'custom_key2': '{{ custom_key2 }}'
}
t3 = PythonOperator(
task_id='print_in_python_op_2',
python_callable=myfunc,
templates_dict=templates_dict,
provide_context=True,
dag=dag,
)
t1 >> t2 >> t3
Run Code Online (Sandbox Code Playgroud)
根据评论添加
使用变量的能力完全取决于Operator。
在 (2) 方法中,通常有用于传递信息的专门属性,例如:
对于使用方法 (1),应在 Operator 代码中使用 jinja 呈现此类属性(它们templated在文档中标记为)。例如,上面的templated属性是属性。
在
可以使用Airflow 宏的每个地方,也可以使用用户变量(通过 定义user_defined_macros)。
在 Airflow 2.0 中,TaskFlow 意味着“Python 可调用”有时只是一个用 @task 注释的函数。在这种情况下,您可以从上下文中检索默认参数:
from airflow.operators.python import get_current_context
@task
def my_task():
context = get_current_context()
email_on_failure = context["dag"].default_args["email_on_failure"]
Run Code Online (Sandbox Code Playgroud)
由于它们是在同一级别的同一文件中定义的,因此您可以执行以下操作:
def webhdfs_touchfile_create(ds, *args, **kwargs):
web_hdfs_hook = WebHDFSHook('webhdfs_default')
client = web_hdfs_hook.get_conn()
client.write("/user/myname/airflow_hdfs","stringToWrite")
pp.pprint(kwargs)
pp.pprint(default_args['touchfile_path'])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5323 次 |
| 最近记录: |