标签: kfp

Kubeflow 管道终止通知

我尝试添加一个逻辑,当管道因某些错误而终止时,该逻辑将发送松弛通知。我试图用ExitHandler. 但是,似乎ExitHandler不能依赖任何操作。你有什么好主意吗?

kubernetes kubeflow kfp

7
推荐指数
1
解决办法
1676
查看次数

如何在 kubeflow 管道中定义管道级卷以在组件之间共享?

kubernetes容器间通信教程定义了以下管道 yaml:

apiVersion: v1
kind: Pod
metadata:
  name: two-containers
spec:

  restartPolicy: Never

  volumes:                      <--- This is what I need
  - name: shared-data
    emptyDir: {}

  containers:

  - name: nginx-container
    image: nginx
    volumeMounts:
    - name: shared-data
      mountPath: /usr/share/nginx/html

  - name: debian-container
    image: debian
    volumeMounts:
    - name: shared-data
      mountPath: /pod-data
    command: ["/bin/sh"]
    args: ["-c", "echo Hello from the debian container > /pod-data/index.html"]
Run Code Online (Sandbox Code Playgroud)

请注意,volumes键是在 下定义的spec,因此该卷可用于所有定义的容器。我想使用kfp实现相同的行为,它是 kubeflow 管道的 API。

但是,我只能将卷添加到单个容器,而不能使用kfp.dsl.ContainerOp.container.add_volume_mount指向先前创建的卷(kfp.dsl.PipelineVolume)的整个工作流程规范,因为该卷似乎只在容器内定义。

这是我尝试过的,但卷始终在第一个容器中定义,而不是“全局”级别。我如何获取它以便op2可以访问该卷?我原以为它位于 …

python google-kubernetes-engine kubeflow-pipelines kfp

5
推荐指数
1
解决办法
5346
查看次数

如何使用 pytest 测试 kfp 组件

我正在尝试使用 pytest 从 kfp.v2.ds1 (在管道上工作)本地测试 kubeflow 组件,但在输入/输出参数和固定装置上遇到了困难。

下面是一个代码示例来说明这个问题:

首先,我创建了一个夹具来模拟数据集。该装置也是一个 kubeflow 组件。

# ./fixtures/

    @pytest.fixture
    @component()
    def sample_df(dataset: Output[Dataset]):
         df = pd.DataFrame(
             {
                 'name': ['Ana', 'Maria', 'Josh'],
                 'age': [15, 19, 22],
             }
         )
         dataset.path += '.csv'
         df.to_csv(dataset.path, index=False)
         return
Run Code Online (Sandbox Code Playgroud)

让我们假设该组件的年龄加倍。

# ./src/
    @component()
    def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
         df = pd.read_csv(df_input.path)
         
         double_df = df.copy()
         double_df['age'] = double_df['age']*2

         df_output.path += '.csv'
         double_df.to_csv(df_output.path, index=False)
Run Code Online (Sandbox Code Playgroud)

然后,测试:

#./tests/

@pytest.mark.usefixtures("sample_df")
def test_double_ages(sample_df):

    expected_df = pd.DataFrame(
        {
            'name': ['Ana', 'Maria', 'Josh'],
            'age': [30, 38, 44],
        }
    ) …
Run Code Online (Sandbox Code Playgroud)

python pytest kubeflow kubeflow-pipelines kfp

3
推荐指数
1
解决办法
1861
查看次数