我尝试添加一个逻辑,当管道因某些错误而终止时,该逻辑将发送松弛通知。我试图用ExitHandler
. 但是,似乎ExitHandler
不能依赖任何操作。你有什么好主意吗?
kubernetes容器间通信教程定义了以下管道 yaml:
apiVersion: v1
kind: Pod
metadata:
name: two-containers
spec:
restartPolicy: Never
volumes: <--- This is what I need
- name: shared-data
emptyDir: {}
containers:
- name: nginx-container
image: nginx
volumeMounts:
- name: shared-data
mountPath: /usr/share/nginx/html
- name: debian-container
image: debian
volumeMounts:
- name: shared-data
mountPath: /pod-data
command: ["/bin/sh"]
args: ["-c", "echo Hello from the debian container > /pod-data/index.html"]
Run Code Online (Sandbox Code Playgroud)
请注意,volumes
键是在 下定义的spec
,因此该卷可用于所有定义的容器。我想使用kfp实现相同的行为,它是 kubeflow 管道的 API。
但是,我只能将卷添加到单个容器,而不能使用kfp.dsl.ContainerOp.container.add_volume_mount
指向先前创建的卷(kfp.dsl.PipelineVolume)的整个工作流程规范,因为该卷似乎只在容器内定义。
这是我尝试过的,但卷始终在第一个容器中定义,而不是“全局”级别。我如何获取它以便op2
可以访问该卷?我原以为它位于 …
我正在尝试使用 pytest 从 kfp.v2.ds1 (在管道上工作)本地测试 kubeflow 组件,但在输入/输出参数和固定装置上遇到了困难。
下面是一个代码示例来说明这个问题:
首先,我创建了一个夹具来模拟数据集。该装置也是一个 kubeflow 组件。
# ./fixtures/
@pytest.fixture
@component()
def sample_df(dataset: Output[Dataset]):
df = pd.DataFrame(
{
'name': ['Ana', 'Maria', 'Josh'],
'age': [15, 19, 22],
}
)
dataset.path += '.csv'
df.to_csv(dataset.path, index=False)
return
Run Code Online (Sandbox Code Playgroud)
让我们假设该组件的年龄加倍。
# ./src/
@component()
def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
df = pd.read_csv(df_input.path)
double_df = df.copy()
double_df['age'] = double_df['age']*2
df_output.path += '.csv'
double_df.to_csv(df_output.path, index=False)
Run Code Online (Sandbox Code Playgroud)
然后,测试:
#./tests/
@pytest.mark.usefixtures("sample_df")
def test_double_ages(sample_df):
expected_df = pd.DataFrame(
{
'name': ['Ana', 'Maria', 'Josh'],
'age': [30, 38, 44],
}
) …
Run Code Online (Sandbox Code Playgroud)