如何将气流工人的数量安装到气流 kubernetes pod 操作员?

bux*_*oum 5 python docker kubernetes airflow airflow-operator

我正在尝试在气流中使用 kubernetes pod 操作符,并且我希望在气流工作器上与 kubernetes pod 共享一个目录,有没有办法将气流工作器的目录挂载到 kubernetes pod?

我尝试了下面的代码,但卷似乎没有成功安装。

import datetime
import unittest
from unittest import TestCase
from airflow.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount


class TestMailAlarm(TestCase):
    def setUp(self):
        self.namespace = "test-namespace"
        self.image = "ubuntu:16.04"
        self.name = "default"

        self.cluster_context = "default"

        self.dag_id = "test_dag"
        self.task_id = "root_test_dag"
        self.execution_date = datetime.datetime.now()

        self.context = {"dag_id": self.dag_id,
                        "task_id": self.task_id,
                        "execution_date": self.execution_date}

        self.cmds = ["sleep"]
        self.arguments = ["100"]

        self.volume_mount = VolumeMount('test',
                                        mount_path='/tmp',
                                        sub_path=None,
                                        read_only=False)

        volume_config = {
            'persistentVolumeClaim':
                {
                    'claimName': 'test'
                }
        }
        self.volume = Volume(name='test', configs=volume_config)

        self.operator = KubernetesPodOperator(
            namespace=self.namespace, image=self.image, name=self.name,
            cmds=self.cmds,
            arguments=self.arguments,
            startup_timeout_seconds=600,
            is_delete_operator_pod=True,
            # the operator could run successfully but the directory /tmp is not mounted to kubernetes operator
            volume=[self.volume],
            volume_mount=[self.volume_mount],
            **self.context)

    def test_execute(self):
        self.operator.execute(self.context)
Run Code Online (Sandbox Code Playgroud)

ECr*_*ris 5

文档中的示例似乎与您的代码非常相似,只是参数是复数volume_mountsvolumes. 对于您的代码,它看起来像这样:

self.operator = KubernetesPodOperator(
            namespace=self.namespace, image=self.image, name=self.name,
            cmds=self.cmds,
            arguments=self.arguments,
            startup_timeout_seconds=600,
            is_delete_operator_pod=True,
            # the operator could run successfully but the directory /tmp is not mounted to kubernetes operator
            volumes=[self.volume],
            volume_mounts=[self.volume_mount],
            **self.context)
Run Code Online (Sandbox Code Playgroud)

  • Plaes teake寻找这个[示例](http://space.af/blog/2018/09/30/gke-airflow-cloud-composer-and-persistent-volumes/)看起来你应该创建pv和pvc并且使用这些信息更新您的 DAG。 (2认同)