如何使用 pytest 测试 kfp 组件

Gab*_*das 3 python pytest kubeflow kubeflow-pipelines kfp

我正在尝试使用 pytest 从 kfp.v2.ds1 (在管道上工作)本地测试 kubeflow 组件,但在输入/输出参数和固定装置上遇到了困难。

下面是一个代码示例来说明这个问题:

首先,我创建了一个夹具来模拟数据集。该装置也是一个 kubeflow 组件。

# ./fixtures/

    @pytest.fixture
    @component()
    def sample_df(dataset: Output[Dataset]):
         df = pd.DataFrame(
             {
                 'name': ['Ana', 'Maria', 'Josh'],
                 'age': [15, 19, 22],
             }
         )
         dataset.path += '.csv'
         df.to_csv(dataset.path, index=False)
         return
Run Code Online (Sandbox Code Playgroud)

让我们假设该组件的年龄加倍。

# ./src/
    @component()
    def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
         df = pd.read_csv(df_input.path)
         
         double_df = df.copy()
         double_df['age'] = double_df['age']*2

         df_output.path += '.csv'
         double_df.to_csv(df_output.path, index=False)
Run Code Online (Sandbox Code Playgroud)

然后,测试:

#./tests/

@pytest.mark.usefixtures("sample_df")
def test_double_ages(sample_df):

    expected_df = pd.DataFrame(
        {
            'name': ['Ana', 'Maria', 'Josh'],
            'age': [30, 38, 44],
        }
    )

    df_component = double_ages(sample_df)    # This is where I call the component, sample_df is an Input[Dataset]
    df_output = df_component.outputs['df_output']
    df = pd.read_csv(df_output.path)
    
    assert df['age'].tolist() == expected_df['age'].tolist()
Run Code Online (Sandbox Code Playgroud)

但这就是问题出现的时候。应该作为输出传递的 Output[Dataset] 不是,因此组件无法正常使用它,然后我会收到以下错误assert df['age'].tolist() == expected_df['age'].tolist()

AttributeError:“TaskOutputArgument”对象没有属性“path”

显然,该对象的类型是TaskOutputArgument,而不是Dataset

有谁知道如何解决这个问题?或者如何正确使用 pytest 和 kfp 组件?我在互联网上搜索了很多,但找不到任何线索。

nic*_*asg 9

在花了我一个下午的时间之后,我终于找到了一种对基于 python 的 KFP 组件进行 pytest 的方法。由于我在这个主题上没有找到其他线索,我希望这可以有所帮助:

访问要测试的功能

诀窍是不要直接测试@component装饰器创建的 KFP 组件。但是,您可以通过组件属性访问内部修饰的python_funcPython函数。

模拟工件

关于InputOutput工件,当您绕过 KFP 访问和调用测试函数时,您必须手动创建它们并将它们传递给函数:

input_artifact = Dataset(uri='input_df_previously_saved.csv')
output_artifact = Dataset(uri='target_output_path.csv')
Run Code Online (Sandbox Code Playgroud)

我必须想出一个解决该Artifact.path属性如何运作的方法(这也适用于所有 KFPArtifact子类:DatasetModel、 ...)。如果您查看 KFP 源代码,您会发现它使用的方法会_get_path()None属性uri不以定义的云前缀之一开头时返回:"gs://""s3://""minio://"。当我们使用本地路径手动构建工件时,想要读取path工件属性的测试组件将读取一个None值。

Artifact所以我做了一个简单的方法来构建一个(或一个Dataset或任何其他Artifact子类)的子类。构建的子类只需更改为返回uri值,而不是None在非云的这种特定情况下uri

你的例子

将所有这些放在一起进行测试和固定装置,我们可以使用以下代码:

  • src/double_ages_component.py:您要测试的组件

这里没有任何变化。我刚刚添加了pandas导入:

from kfp.v2.dsl import component, Input, Dataset, Output

@component
def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
    import pandas as pd

    df = pd.read_csv(df_input.path)

    double_df = df.copy()
    double_df['age'] = double_df['age'] * 2

    df_output.path += '.csv'
    double_df.to_csv(df_output.path, index=False)
Run Code Online (Sandbox Code Playgroud)
  • tests/utils.py:Artifact 子类构建器
import typing

def make_test_artifact(artifact_type: typing.Type):
    class TestArtifact(artifact_type):
        def _get_path(self):
            return super()._get_path() or self.uri

    return TestArtifact
Run Code Online (Sandbox Code Playgroud)

我仍然不确定这是最合适的解决方法。您还可以为您使用的每个工件手动创建一个子类(Dataset在您的示例中)。kfp.v2.dsl.Artifact或者您可以使用pytest-mock直接模拟该类。

  • tests/conftest.py:你的固定装置

我将示例数据框创建器组件与夹具分开。因此,我们有一个标准的 KFP 组件定义 + 一个构建其输出工件并调用其 python 函数的固定装置:

from kfp.v2.dsl import component, Dataset, Output
import pytest

from tests.utils import make_test_artifact

@component
def sample_df_component(dataset: Output[Dataset]):
    import pandas as pd

    df = pd.DataFrame({
        'name': ['Ana', 'Maria', 'Josh'],
        'age': [15, 19, 22],
    })
    dataset.path += '.csv'
    df.to_csv(dataset.path, index=False)

@pytest.fixture
def sample_df():
    # define output artifact
    output_path = 'local_sample_df.csv'  # any writable local path. I'd recommend to use pytest `tmp_path` fixture.
    sample_df_artifact = make_test_artifact(Dataset)(uri=output_path)

    # call component python_func by passing the artifact yourself
    sample_df_component.python_func(dataset=sample_df_artifact)
    # the artifact object is now altered with the new path that you define in sample_df_component (".csv" extension added)

    return sample_df_artifact
Run Code Online (Sandbox Code Playgroud)

该装置返回一个工件对象,该对象引用保存示例数据帧的选定本地路径。

  • tests/test_component.py:您的实际组件测试

再次强调,这个想法是构建 I/O 工件并调用组件的python_func

from kfp.v2.dsl import Dataset
import pandas as pd

from src.double_ages_component import double_ages
from tests.utils import make_test_artifact

def test_double_ages(sample_df):
    expected_df = pd.DataFrame({
        'name': ['Ana', 'Maria', 'Josh'],
        'age': [30, 38, 44],
    })

    # input artifact is passed in parameter via sample_df fixture
    # create output artifact
    output_path = 'local_test_output_df.csv'
    output_df_artifact = make_test_artifact(Dataset)(uri=output_path)

    # call component python_func
    double_ages.python_func(df_input=sample_df, df_output=output_df_artifact)

    # read output data
    df = pd.read_csv(output_df_artifact.path)

    # write your tests
    assert df['age'].tolist() == expected_df['age'].tolist()
Run Code Online (Sandbox Code Playgroud)

结果

> pytest
================ test session starts ================
platform linux -- Python 3.8.13, pytest-7.1.3, pluggy-1.0.0
rootdir: /home/USER/code/kfp_tests
collected 1 item                                                                                                                                                                                                                        

tests/test_component.py .                      [100%]

================ 1 passed in 0.28s ================
Run Code Online (Sandbox Code Playgroud)