gau*_*tri 2 hadoop amazon-emr apache-spark airflow
我有 Airflow 作业,它们在 EMR 集群上运行良好。我需要的是,假设我有 4 个需要 EMR 集群的气流作业,假设需要 20 分钟才能完成任务。为什么我们不能在 DAG 运行时创建 EMR 集群,一旦作业完成,它将终止创建的 EMR 集群。
毫无疑问,这将是最有效地利用资源。让我警告你:这里面有很多细节;我会尽量列出尽可能多的内容。我鼓励你添加你自己的综合答案,列出你遇到的任何问题和解决方法(一旦你完成了这个)
关于集群创建/终止
对于集群创建和终止,您分别拥有EmrCreateJobFlowOperator和EmrTerminateJobFlowOperator
如果您不使用AWS SecretAccessKey(并且完全依赖IAMRoles),请不要担心;实例化任何AWS-relatedhook或operatorinAirflow将自动回退到底层EC2的附加IAM角色
如果傻冒不使用EMR步骤API作业提交,那么你也必须手动感都使用上述操作Sensors。已经有一个用于轮询创建阶段的传感器EmrJobFlowSensor,您可以稍微修改它以创建一个用于终止的传感器
您将集群配置 JSON 传递到job_flow_extra. 您还可以在 aConnection的(如my_emr_conn)extraparam 中传递配置,但不要使用它,因为它经常破坏SQLAlchemyORM 加载(因为它很大json)
关于作业提交
您可以Emr使用 EMR-Steps API提交作业,这可以在集群创建阶段(在集群配置 JSON 内)或之后使用add_job_flow_steps(). 甚至还有一个emr_add_steps_operator()in Airflowwhich 也需要一个EmrStepSensor. 您可以在AWS文档中阅读更多关于它的信息,您可能还必须使用command-runner.jar
对于特定于应用程序的情况(如Hive、Livy),您可以使用它们的特定方式。例如,您可以使用HiveServer2Hook提交Hive作业。这是一个棘手的部分:run_job_flow()调用(在集群创建阶段进行)只给你一个job_flow_id(cluster-id)。您必须使用describe_cluster()调用usingEmrHook来获取主节点的私有 IP。使用它,您将能够以编程方式创建一个Connection(例如Hive Server 2 Thriftconnection)并将其用于将您的计算提交到集群。并且不要忘记在完成工作流程之前删除这些连接(为了优雅)。
最后是用于与集群交互的老式 bash。为此,您还应该EC2在集群创建阶段传递一个密钥对。之后,您可以以编程方式创建SSH连接并使用它(带有SSHHook或SSHOperator)在集群上运行作业。在此处阅读有关 SSH 的更多信息Airflow
特别是将Spark作业提交到远程 Emr集群,请阅读此讨论
| 归档时间: |
|
| 查看次数: |
5127 次 |
| 最近记录: |