Airflow GCP 与 Kubernetes 的连接问题 - Fernet 密钥必须是 32 个 url 安全的 base64 编码字节

Rya*_*lle 5 python kubernetes airflow airflow-scheduler

我目前正在 Google Cloud GCP 中的 Kubernetes 上运行 Airflow。我的项目基于docker-airflow。我可以启动 UI,但是当我尝试为 google cloud 创建连接并提交连接时,出现以下错误。

文档建议的第一个问题是确保您安装了加密技术,我就是这样做的。我安装了两种类型,一种是带有气流的,另一种是来自 PyPi 的标准类型。

pip3 install apache-airflow[kubernetes,crypto] and also tried
pip install cryptography
Run Code Online (Sandbox Code Playgroud)

我尝试运行用于生成和存储环境变量的命令,如文档中所述,可在此处找到。(如下所示)

1)手动生成 fernet 密钥并添加到 airflow.cfg

2)设置环境变量并重新启动服务器。

python -c "from cryptography.fernet import Fernet; 
print(Fernet.generate_key().decode())"
Run Code Online (Sandbox Code Playgroud)

示例键:81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=

使用 kubernetes,我无法使用关闭进程 ID 的典型方法来重新启动服务器,因为它与容器绑定。我还尝试将生成的密钥(上图)放入 kubernetes 集群的 configmaps.yaml 文件中(部署时等于airflow.cfg)。

我尝试通过 DAG、UI 以及使用气流命令行客户端手动运行 GCP 连接。所有三种方法都返回相同的错误。我在此处添加了 UI 提交的图片以及完整的堆栈跟踪。

问题

  • 为什么会发生这种情况?fernet 密钥没有生成吗?它可能没有保存在底层卷上吗?*

谢谢您的帮助。

-RR

Traceback (most recent call last):

File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 159, in get_fernet
_fernet = Fernet(configuration.conf.get('core', 'FERNET_KEY').encode('utf-8'))
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 37, in __init__
"Fernet key must be 32 url-safe base64-encoded bytes."
ValueError: Fernet key must be 32 url-safe base64-encoded bytes.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in wsgi_app
  response = self.full_dispatch_request()
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in full_dispatch_request
  rv = self.handle_user_exception(e)
    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in handle_user_exception
  reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, in reraise
  raise value
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in full_dispatch_request
  rv = self.dispatch_request()
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in dispatch_request
  return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/security/decorators.py", line 26, in wraps
  return f(self, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/views.py", line 524, in edit
  widgets = self._edit(pk)
File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/baseviews.py", line 965, in _edit
  form.populate_obj(item)
File "/usr/local/lib/python3.6/site-packages/wtforms/form.py", line 96, in populate_obj
  field.populate_obj(obj, name)
File "/usr/local/lib/python3.6/site-packages/wtforms/fields/core.py", line 330, in populate_obj
  setattr(obj, name, self.data)
File "<string>", line 1, in __set__
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 731, in set_extra
  fernet = get_fernet()
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 163, in get_fernet
raise AirflowException("Could not create Fernet object: {}".format(ve))
airflow.exceptions.AirflowException: Could not create Fernet object: 
Fernet key must be 32 url-safe base64-encoded bytes.
Run Code Online (Sandbox Code Playgroud)

这是底层持久卷的 YAML。

kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: airflow-dags
  namespace: data
spec:
  accessModes:
    - ReadOnlyMany
  storageClassName: standard    
  resources:
    requests:
      storage: 8Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: airflow-logs
  namespace: data
spec:
  accessModes:
    - ReadOnlyMany
  storageClassName: standard    
  resources:
    requests:
      storage: 8Gi
Run Code Online (Sandbox Code Playgroud)

这是气流配置 YAML。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow
  namespace: data
  labels:
    name: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      name: airflow
  template:
    metadata:
      labels:
        name: airflow
    spec:
      serviceAccountName: spark-service-account
      automountServiceAccountToken: true
      initContainers:
      - name: "init"
        image: <image_name>
        imagePullPolicy: Always
        volumeMounts:
        - name: airflow-configmap
          mountPath: /root/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-dags
          mountPath: /root/airflow/dags
        # - name: test-volume
        #   mountPath: /root/test_volume
        env:
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        command:
          - "bash"
        args:
          - "-cx"
          - "airflow initdb || true && airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow || true"
      containers:
      - name: webserver
        image: <image_name>
        imagePullPolicy: IfNotPresent
        ports:
        - name: webserver
          containerPort: 8080
        env:
        - name: <namespace_name>
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        command: ["/bin/sh", "-c"]
        args: ["airflow webserver"]
        volumeMounts:
        - name: airflow-configmap
          mountPath: /root/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-dags
          mountPath: /root/airflow/dags
        - name: airflow-logs
          mountPath: /root/airflow/logs
        # readinessProbe:
        #   initialDelaySeconds: 5
        #   timeoutSeconds: 5
        #   periodSeconds: 5
        #   httpGet:
        #     path: /login
        #     port: 8080
        # livenessProbe:
        #   initialDelaySeconds: 5
        #   timeoutSeconds: 5
        #   failureThreshold: 5
        #   httpGet:
        #     path: /login
        #     port: 8080
      - name: scheduler
        image: image-name
        imagePullPolicy: IfNotPresent
        env:
        - name: namespace_name
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        command: ["/bin/sh", "-c"]
        args: ["cp ./dags/* /root/airflow/dags/; airflow scheduler"]
        volumeMounts:
        - name: airflow-configmap
          mountPath: /root/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-dags
          mountPath: /root/airflow/dags
        - name: airflow-logs
          mountPath: /root/airflow/logs
      volumes:
      - name: airflow-configmap
        configMap:
          name: airflow-configmap
      - name: airflow-dags
        persistentVolumeClaim:
          claimName: airflow-dags
      - name: airflow-logs
        persistentVolumeClaim:
          claimName: airflow-logs
---
apiVersion: v1
kind: Service
metadata:
  name: airflow
  namespace: data
spec:
  type: NodePort
  ports:
    - port: 8080
      nodePort: 30809
  selector:
    name: airflow
Run Code Online (Sandbox Code Playgroud)

用户界面

Tig*_*z32 1

重新启动工作线程和网络服务器。

您的工作人员和网络服务器正在旧的 fernet 密钥上运行。您更改了配置中的密钥,因此所有新存储或修改的连接都将使用新密钥,但网络服务器/工作人员仍在旧密钥上运行。它们永远不会匹配并继续给出此错误,直到它们重新启动。