如何在airflow中为kubernetes pod动态构建资源(V1ResourceRequirements)对象

Ala*_*ain 4 python kubernetes airflow

我目前正在将 DAG 从气流版本 1.10.10 迁移到 2.0.0。

该 DAG 使用自定义 Python 运算符,根据任务的复杂性动态分配资源。问题是 v1.10.10 中使用的导入(airflow.contrib.kubernetes.pod import Resources)不再有效。我读到,对于 v2.0.0,我应该使用kubernetes.client.models.V1ResourceRequirements,但我需要动态构建此资源对象。这可能听起来很愚蠢,但我一直无法找到构建这个对象的正确方法。

例如,我尝试过

            self.resources = k8s.V1ResourceRequirements(
                request_memory=get_k8s_resources_mapping(resource_request)['memory'],
                limit_memory=get_k8s_resources_mapping(resource_request)['memory_l'],
                request_cpu=get_k8s_resources_mapping(resource_request)['cpu'],
                limit_cpu=get_k8s_resources_mapping(resource_request)['cpu_l']
            )
Run Code Online (Sandbox Code Playgroud)

或者

            self.resources = k8s.V1ResourceRequirements(
                requests={'cpu': get_k8s_resources_mapping(resource_request)['cpu'],
                          'memory': get_k8s_resources_mapping(resource_request)['memory']},
                limits={'cpu': get_k8s_resources_mapping(resource_request)['cpu_l'],
                        'memory': get_k8s_resources_mapping(resource_request)['memory_l']}
            )
Run Code Online (Sandbox Code Playgroud)

(get_k8s_resources_mapping(resource_request)['xxxx'] 仅根据resource_request返回一个值,例如内存的“2Gi”或CPU的“2”)

但它们似乎不起作用。任务失败。

所以,我的问题是,如何在 Python 中正确构建 V1ResourceRequirements?并且,它在任务实例的 executor_config 属性中应该是什么样子?也许是这样的?

'resources': {'limits': {'cpu': '1', 'memory': '512Mi'}, 'requests': {'cpu': '1', 'memory': '512Mi'}}
Run Code Online (Sandbox Code Playgroud)

Ela*_*lad 5

正确的语法是:

对于apache-airflow-providers-cncf-kubernetes>=5.3.0

from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

KubernetesPodOperator(
    ...,
    container_resources = client.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "8G"},
        limits={"cpu": "16000m", "memory": "128G"}
    )
)
Run Code Online (Sandbox Code Playgroud)

如果您想动态生成它,只需将 requests/limits 中的值替换为返回预期 string 的函数即可。


以下是代码在早期版本上运行所需的更改。

对于apache-airflow-providers-cncf-kubernetes<5.3.0 和 >=4.2.0

将导入路径更改为:

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
Run Code Online (Sandbox Code Playgroud)

对于apache-airflow-providers-cncf-kubernetes<4.2.0

改成container_resourcesresources