标签: kubernetespodoperator

如何为airflow DAG任务(或python代码)安装依赖模块?, 使用 kuberentesExecutor 时气流 DAG 中“无法导入模块”

我有一个气流 DAG“example_ml.py”,它有一个任务“train_ml_model”,该任务正在调用/运行 python 脚本“training.py”。

-Dags/example_ml.py -Dags/training.py

当我运行 Dag 时,无法导入执行训练脚本所需的模块。 导入sklearn模块出错

DAG任务的代码片段:

   train_model = PythonOperator(
        task_id='train_model',
        python_callable=training,
        dag = dag
    )
Run Code Online (Sandbox Code Playgroud)

PS:我使用的是k8s集群。Airflow运行在k8s集群中,executor设置为kubernetesExecutor。因此,当每个 DAG 被触发时,就会分配一个新的 pod 来完成任务。

airflow kubernetespodoperator

8
推荐指数
1
解决办法
4763
查看次数

Airflow KubernetesPodOperator - 使用 ConfigMap 值作为环境变量

我有一个名为 Kubernetes ConfigMap test,其中包含具有某些值的键foobar。我想在环境变量中使用该键的值。

import datetime
import os
from airflow import models
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.operators import kubernetes_pod_operator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

cm_test = ['test']

volume_mount = VolumeMount('test',
                            mount_path='/config/',
                            sub_path=None,
                            read_only=False)
volume_config = {
    'configMap': {
        'name': 'test'
    }
}
volume = Volume(name='test', configs=volume_config)

with models.DAG(
        dag_id="test_env",
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:

    
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="test_env",
        name="test_env",
        namespace="foobar",
        image="bash",
        cmds=["printenv"],
        arguments=[],
        volumes=[volume],
        volume_mounts=[volume_mount],
        configmaps=cm_test,
        env_vars={
            'MY_ENV_VAR': '/config/foobar'
        }
    )
Run Code Online (Sandbox Code Playgroud)

正在做什么:ConfigMap …

kubernetes airflow kubernetespodoperator

6
推荐指数
0
解决办法
1035
查看次数

在气流 kubernetes pod 操作员中传递 --serviceaccount

我正在尝试使用 Airflow kubernetes pod 操作符创建并运行 pod。下面的命令经过尝试并确认有效,我正在尝试在本地使用 kubernetes pod 操作员复制相同的命令

kubectl run sparkairflow -n test-namespace --image=some-docker-repo.com:hello-world --serviceaccount=airflow --restart=Never -- spark-submit --deploy-mode cluster --master k8s://kubernetes.default.cluster.local:123 \
                 --name sparkairflow \
                 --conf spark.kubernetes.namespace=test-namespace \
                 --conf spark.kubernetes.container.image=some-docker-repo.com:hello-world \
                 --conf spark.kubernetes.authenticate.driver.serviceAccountName=airflow \
...
Run Code Online (Sandbox Code Playgroud)

在这里遇到了麻烦,因为似乎没有办法使用气流传递 --serviceaccount 标志,而这是我的实现所必需的,这会在我这边引发错误。

线程“main”中出现异常 io.fabric8.kubernetes.client.KubernetesClientException:pods“sparkairflow-155252344-driver”被禁止:用户“system:serviceaccount:test-namespace:default”无法观看 API 组“”中的资源“pods”在命名空间“test-namespace”中:访问被拒绝

到目前为止,我找到的解决方案主要侧重于将默认用户添加到命名空间角色,但这对于我的情况来说是不可能的。

有什么方法可以将 serviceaccount 标志传递给气流 kubernetes 操作员吗?

谢谢!

kubernetes airflow kubernetespodoperator

3
推荐指数
1
解决办法
6476
查看次数