Cross Validation using Dagster

moo*_*ima 1 python architecture pipeline machine-learning dagster

I've started using Dagster in our ML pipeline, and am running into some basic issues that I'm wondering if I'm missing something trivial here or if this is just how it is...

Say I have a simple ML pipepline:

Load raw data --> Process data into table --> Split train / test --> train model --> evaluate model.
Run Code Online (Sandbox Code Playgroud)

A linear model is straight forward in Dagster. But what if I want to add a little loop, say for cross-validation purposes:

Load raw data --> Process data into table --> Split into k folds, and for each fold:
  - fold 1: train model --> evaluate
  - fold 2: train model --> evaluate
  - fold 3: train model --> evaluate
  --> summarize cross validation results.
Run Code Online (Sandbox Code Playgroud)

Is there a nice & clean way to do this in Dagster? The way I've been doing things is:

Load raw data --> Process data into table --> Split into K folds --> choose fold k --> train model --> evaluate model
Run Code Online (Sandbox Code Playgroud)

With the fold "k" as an input parameter for the pipeline. And then running the pipeline K times.

What am I missing here?

Cat*_* Wu 5

是的,Dagster 确实支持在单个管道中将实体扇出到多个实体中,而不是扇入到接收器实体中(即汇总结果)。这是一些示例代码和 dagit 中相应的 dag 可视化( full dagzoomed)。

@solid
def load_raw_data(_):
    yield Output('loaded_data')


@solid
def process_data_into_table(_, raw_data):
    yield Output(raw_data)


@solid(
    output_defs=[
        OutputDefinition(name='fold_one', dagster_type=int, is_required=True),
        OutputDefinition(name='fold_two', dagster_type=int, is_required=True),
    ],
)
def split_into_two_folds(_, table):
    yield Output(1, 'fold_one')
    yield Output(2, 'fold_two')


@solid
def train_fold(_, fold):
    yield Output('model')


@solid
def evaluate_fold(_, model):
    yield Output('compute_result')


@composite_solid
def process_fold(fold):
    return evaluate_fold(train_fold(fold))


@solid
def summarize_results(context, fold_1_result, fold_2_result):
    yield Output('summary_stats')


@pipeline
def ml_pipeline():
    fold_one, fold_two = split_into_two_folds(process_data_into_table(load_raw_data()))

    process_fold_one = process_fold.alias('process_fold_one')
    process_fold_two = process_fold.alias('process_fold_two')

    summarize_results(process_fold_one(fold_one), process_fold_two(fold_two))
Run Code Online (Sandbox Code Playgroud)

在示例代码中,我们使用 alias-es 以便对每个折叠重复使用相同的逻辑。我们还整合了处理复合实体中每个折叠的逻辑。

另一种选择是以编程方式直接创建PipelineDefinition但我建议使用上述方法。