spa*_*unk 7 python emr airflow
在Airflow中,我面临的问题是我需要将其传递job_flow_id给我的一个emr步骤.我能够job_flow_id从运营商检索,但是当我要创建提交到集群的步骤时,task_instance值不正确.我有以下代码:
def issue_step(name, args):
return [
{
"Name": name,
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "s3://....",
"Args": args
}
}
]
dag = DAG('example',
description='My dag',
schedule_interval='0 8 * * 6',
dagrun_timeout=timedelta(days=2))
try:
create_emr = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
dag=dag
)
load_data_steps = issue_step('load', ['arg1', 'arg2'])
load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
load_data_steps[0]["HadoopJarStep"]["Args"].append(
"{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id
load_data = EmrAddStepsOperator(
task_id='load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others
aws_conn_id='aws_default',
steps=load_data_steps,
dag=dag
)
check_load_data = EmrStepSensor(
task_id='watch_load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
create_emr_recommendations >> load_data
load_data >> check_load_data
check_load_data >> cluster_remover
except AirflowException as ae:
print ae.message
Run Code Online (Sandbox Code Playgroud)
问题是,当我检查EMR,也看不到--cluster-id j-1234的load_data一步,我看--cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}",这导致我的步骤就是失败的.
如何获得step函数中的实际值?
谢谢,节日快乐
我发现气流存储库上有关于此的 PR .问题在于没有模板化步骤EmrAddStepsOperator.为了解决这个问题,我做了以下事情:
EmrAddStepsOperator这里是自定义运算符的代码和文件中的插件custom_emr_add_step_operator.py(参见下面的树)
from __future__ import division, absolute_import, print_function
from airflow.plugins_manager import AirflowPlugin
from airflow.utils import apply_defaults
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
class CustomEmrAddStepsOperator(EmrAddStepsOperator):
template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above
@apply_defaults
def __init__(
self,
*args, **kwargs):
super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs)
def execute(self, context):
super(CustomEmrAddStepsOperator, self).execute(context=context)
# Defining the plugin class
class CustomPlugin(AirflowPlugin):
name = "custom_plugin"
operators = [CustomEmrAddStepsOperator]
Run Code Online (Sandbox Code Playgroud)
在我的DAG文件中,我以这种方式调用了插件
from airflow.operators import CustomEmrAddStepsOperator
Run Code Online (Sandbox Code Playgroud)
我的项目和插件的结构如下所示:
??? config
? ??? airflow.cfg
??? dags
? ??? __init__.py
? ??? my_dag.py
??? plugins
? ??? __init__.py
? ??? operators
? ??? __init__.py
? ??? custom_emr_add_step_operator.py
??? requirements.txt
Run Code Online (Sandbox Code Playgroud)
如果您使用的是像PyCharm这样的IDE,这会引发抱怨,因为它说无法找到模块.但是当您运行Airflow时,不会出现此问题.还要记住确保在你的airflow.cfg指向正确的plugins文件夹中,以便Airflow能够读取你新创建的插件.
| 归档时间: |
|
| 查看次数: |
3113 次 |
| 最近记录: |