是否可以编写一个容忍失败的子任务的 luigi 包装器任务?

Dal*_*yaG 13 python error-handling dataflow luigi data-pipeline

我有一个执行一些不稳定计算的 luigi 任务。想想有时不收敛的优化过程。

import luigi

MyOptimizer(luigi.Task):
    input_param: luigi.Parameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        optimize_something(self.input_param, self.output().path)

    def output(self):
        return luigi.LocalTarget(self.output_filename)
Run Code Online (Sandbox Code Playgroud)

现在我想构建一个包装器任务,该任务将使用不同的输入参数多次运行此优化器,并获取收敛的第一次运行的输出。

我现在实现它的方式是使用,MyOptimizer因为如果它失败,luigi 会认为包装器任务也失败了,但我可以接受一些MyOptimizer失败的实例。

MyWrapper(luigi.Task):
    input_params_list = luigi.ListParameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        for input_param in self.input_params_list:
            try:
                optimize_something(self.input_param, self.output().path)
                print(f"Optimizer succeeded with input {input_param}")
                break
            except Exception as e:
                print(f"Optimizer failed with input {input_param}. Trying again...")

    def output(self):
        return luigi.LocalTarget(self.output_filename)
Run Code Online (Sandbox Code Playgroud)

问题在于,通过这种方式,任务不会并行化。此外,您可以想象MyOptimizeroptimize_something参与由 luigi 处理的数据管道的复杂任务,这在我的代码中造成了相当多的混乱。

我将不胜感激有关如何以类似路易吉的方式进行这项工作的任何见解和想法:)

小智 1

你能让你的优化器总是写出一些东西吗?即使它是一个空文件来表示失败,但对 luigi 来说这看起来会成功吗?另外,在 MyOptimizer 的输出文件名中使用 input_param 以使文件名唯一。

然后:

MyWrapper(luigi.Task):
    input_params_list = luigi.ListParameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        task_list = [MyOptimizer(input_param) for input_param in self.input_params_list]
        targets = yield task_list #executes tasks in parallel 
        for target in targets:
             ...do something to read and compare outputs
             some_data = some_read(target.path)
        ...write optimal solution

    def output(self):
        return luigi.LocalTarget(self.output_filename)
Run Code Online (Sandbox Code Playgroud)