Jit*_*tel 7 docker kubernetes pyspark airflow amazon-eks
Apache Airflow 版本:v2.1.1
Kubernetes 版本(如果您使用 kubernetes)(使用 kubectl 版本):- 客户端版本:version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.2", GitCommit:"092fbfbf53427de67cac1e9fa54aaa09a28371d7", GitTreeState :"clean", BuildDate:"2021-06-16T12:52:14Z", GoVersion:"go1.16.5", 编译器:"gc", Platform:"darwin/amd64"} 服务器版本: version.Info{Major: “1”,次要:“19+”,GitVersion:“v1.19.8-eks-96780e”,GitCommit:“96780e1b30acbf0a52c38b6030d7853e575bcdf3”,GitTreeState:“干净”,BuildDate:“2021-03-10T21:32:29Z”,GoVersion :“go1.15.8”,编译器:“gc”,平台:“linux/amd64”}
环境:开发
云提供商或硬件配置: AWS EKS 操作系统(例如来自 /etc/os-release): 内核(例如 uname -a): 安装工具: 其他: 发生了什么: 我无法使用 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications气流 DAG。我在 EKS 上托管了 Airflow 和 Spark-operator。我已经在 Airflow 上创建了一个连接,通过使用“集群配置”来连接到 Kubernetes 集群。我只是运行示例应用程序只是为了通过 Airflow 检查 Kubernetes 上 Spark 的执行情况。
应用程序 YAML 文件:-
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi-airflow
namespace: spark-apps
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
Run Code Online (Sandbox Code Playgroud)
气流 DAG:-
from datetime import timedelta
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'max_active_runs': 1,
}
# [END default_args]
# [START instantiate_dag]
dag = DAG(
'spark_pi_airflow',
default_args=default_args,
description='submit spark-pi as sparkApplication on kubernetes',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
)
t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace="spark-apps",
application_file="example_spark_kubernetes_spark_pi.yaml",
kubernetes_conn_id="kubernetes_default",
do_xcom_push=True,
dag=dag,
)
t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace="spark-apps",
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
kubernetes_conn_id="kubernetes_default",
dag=dag,
)
t1 >> t2
Run Code Online (Sandbox Code Playgroud)
错误信息:-
[2021-07-12 10:18:46,629] {spark_kubernetes.py:67} INFO - Creating sparkApplication
[2021-07-12 10:18:46,662] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 174, in create_custom_object
response = api.create_namespaced_custom_object(
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 183, in create_namespaced_custom_object
(data) = self.create_namespaced_custom_object_with_http_info(group, version, namespace, plural, body, **kwargs) # noqa: E501
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 275, in create_namespaced_custom_object_with_http_info
return self.api_client.call_api(
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api
return self.__call_api(resource_path, method,
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api
response_data = self.request(
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request
return self.rest_client.POST(url,
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST
return self.request("POST", url,
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request
raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 69, in execute
response = hook.create_custom_object(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 180, in create_custom_object
raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")
airflow.exceptions.AirflowException: Exception when calling -> create_custom_object: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:***:***-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}
Run Code Online (Sandbox Code Playgroud)
您期望发生的情况:Kubernetes Airflow 应使用 SparkKubernetesOperator 安排并运行 Spark 作业。
如何重现:在 Kubernetes 集群上使用 helm 部署 Spark Operator。在 Kubernetes 集群上使用 helm 部署 Airflow。部署上述应用程序和 Airflow DAG。
我们还需要知道什么:-
我已经创建了服务帐户:-
$ kubectl create serviceaccount spark
Run Code Online (Sandbox Code Playgroud)
给定服务帐户集群上的编辑角色:-
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=airflow:airflow-cluster --namespace=spark-apps
Run Code Online (Sandbox Code Playgroud)
Rom*_*man 10
以下是 kube 集群角色资源。创建与kubectl -n <namespace> apply -f <filename.yaml>
# Role for spark-on-k8s-operator to create resources on cluster
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: spark-cluster-cr
labels:
rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rules:
- apiGroups:
- sparkoperator.k8s.io
resources:
- sparkapplications
verbs:
- '*'
---
# Allow airflow-worker service account access for spark-on-k8s
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: airflow-spark-crb
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: spark-cluster-cr
subjects:
- kind: ServiceAccount
name: airflow-cluster
namespace: airflow
Run Code Online (Sandbox Code Playgroud)
笔记:
sparkapplications.sparkoperator.k8s.io is forbidden: User "system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace spark-apps
Run Code Online (Sandbox Code Playgroud)
airflowairflow-clusterspark-apps --set webhook.enable=true你想env在你的spec.driver| 归档时间: |
|
| 查看次数: |
4306 次 |
| 最近记录: |