无法使用 Airflow DAG 中的 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications

Jit*_*tel 7 docker kubernetes pyspark airflow amazon-eks

Apache Airflow 版本:v2.1.1

Kubernetes 版本(如果您使用 kubernetes)(使用 kubectl 版本):- 客户端版本:version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.2", GitCommit:"092fbfbf53427de67cac1e9fa54aaa09a28371d7", GitTreeState :"clean", BuildDate:"2021-06-16T12:52:14Z", GoVersion:"go1.16.5", 编译器:"gc", Platform:"darwin/amd64"} 服务器版本: version.Info{Major: “1”,次要:“19+”,GitVersion:“v1.19.8-eks-96780e”,GitCommit:“96780e1b30acbf0a52c38b6030d7853e575bcdf3”,GitTreeState:“干净”,BuildDate:“2021-03-10T21:32:29Z”,GoVersion :“go1.15.8”,编译器:“gc”,平台:“linux/amd64”}

环境:开发

云提供商或硬件配置: AWS EKS 操作系统(例如来自 /etc/os-release): 内核(例如 uname -a): 安装工具: 其他: 发生了什么: 我无法使用 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications气流 DAG。我在 EKS 上托管了 Airflow 和 Spark-operator。我已经在 Airflow 上创建了一个连接,通过使用“集群配置”来连接到 Kubernetes 集群。我只是运行示例应用程序只是为了通过 Airflow 检查 Kubernetes 上 Spark 的执行情况。

应用程序 YAML 文件:-

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi-airflow
  namespace: spark-apps
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v3.1.1"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
  sparkVersion: "3.1.1"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.1
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
Run Code Online (Sandbox Code Playgroud)

气流 DAG:-


from datetime import timedelta

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'max_active_runs': 1,
}
# [END default_args]

# [START instantiate_dag]

dag = DAG(
    'spark_pi_airflow',
    default_args=default_args,
    description='submit spark-pi as sparkApplication on kubernetes',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
)

t1 = SparkKubernetesOperator(
    task_id='spark_pi_submit',
    namespace="spark-apps",
    application_file="example_spark_kubernetes_spark_pi.yaml",
    kubernetes_conn_id="kubernetes_default",
    do_xcom_push=True,
    dag=dag,
)

t2 = SparkKubernetesSensor(
    task_id='spark_pi_monitor',
    namespace="spark-apps",
    application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
    kubernetes_conn_id="kubernetes_default",
    dag=dag,
)
t1 >> t2
Run Code Online (Sandbox Code Playgroud)

错误信息:-


[2021-07-12 10:18:46,629] {spark_kubernetes.py:67} INFO - Creating sparkApplication
[2021-07-12 10:18:46,662] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 174, in create_custom_object
    response = api.create_namespaced_custom_object(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 183, in create_namespaced_custom_object
    (data) = self.create_namespaced_custom_object_with_http_info(group, version, namespace, plural, body, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 275, in create_namespaced_custom_object_with_http_info
    return self.api_client.call_api(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api
    return self.__call_api(resource_path, method,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api
    response_data = self.request(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request
    return self.rest_client.POST(url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST
    return self.request("POST", url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request
    raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}



During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 69, in execute
    response = hook.create_custom_object(
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 180, in create_custom_object
    raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")
airflow.exceptions.AirflowException: Exception when calling -> create_custom_object: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:***:***-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}
Run Code Online (Sandbox Code Playgroud)

您期望发生的情况:Kubernetes Airflow 应使用 SparkKubernetesOperator 安排并运行 Spark 作业。

如何重现:在 Kubernetes 集群上使用 helm 部署 Spark Operator。在 Kubernetes 集群上使用 helm 部署 Airflow。部署上述应用程序和 Airflow DAG。

我们还需要知道什么:-

我已经创建了服务帐户:-

$ kubectl create serviceaccount spark
Run Code Online (Sandbox Code Playgroud)

给定服务帐户集群上的编辑角色:-

$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=airflow:airflow-cluster --namespace=spark-apps
Run Code Online (Sandbox Code Playgroud)

Rom*_*man 10

以下是 kube 集群角色资源。创建与kubectl -n <namespace> apply -f <filename.yaml>

# Role for spark-on-k8s-operator to create resources on cluster
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: spark-cluster-cr
  labels:
    rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rules:
  - apiGroups:
      - sparkoperator.k8s.io
    resources:
      - sparkapplications
    verbs:
      - '*'
---
# Allow airflow-worker service account access for spark-on-k8s
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: airflow-spark-crb
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: spark-cluster-cr
subjects:
  - kind: ServiceAccount
    name: airflow-cluster
    namespace: airflow
Run Code Online (Sandbox Code Playgroud)

笔记:

  • 以上是假设错误消息
    sparkapplications.sparkoperator.k8s.io is forbidden: User "system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace spark-apps
    
    Run Code Online (Sandbox Code Playgroud)
    • 气流命名空间:airflow
    • 气流服务帐户:airflow-cluster
    • Spark 作业命名空间:spark-apps
  • 您还应该安装 Spark-on-k8s-operator
    • 如果 --set webhook.enable=true你想env在你的spec.driver


Jit*_*tel -8

在向气流命名空间上的服务帐户授予适当的权限后,我的问题已得到解决。

  • 您能在这里发布解决方案吗? (3认同)