使用Luigi管道时组织文件?

Man*_*ame 5 python workflow dependencies class luigi

我正在使用Luigi来完成我的工作流程.我的工作流程分为三个通用部分 - 导入,分析,导出.在每个部分中,有多个Luigi任务.

我可以将所有内容都放在一个文件中.但是,如果我想要把一切都分开,在有data_import.py,analysis.pyexport.py.

例如,如果data_import.py看起来像:

import luigi

class import_task_A(luigi.Task):
    def requires(self):
        return []
    def output(self):
        return luigi.LocalTarget('myfile.txt')
    def run(self):
        my import stuff

if __name__ == '__main__':
    luigi.run()
Run Code Online (Sandbox Code Playgroud)

但是,如果export.py中的任务依赖于import.py中的任务,该怎么办?我会这样做:

from data_import import import_task_A
import luigi

class export_task_A(luigi.Task):
    def requires(self):
        return import_task_A()
    def output(self):
        return luigi.LocalTarget('myfile.txt')
    def run(self):
        my import stuff

if __name__ == '__main__':
    luigi.run()
Run Code Online (Sandbox Code Playgroud)

如果我将大型项目分解为多个.py文件,那么告诉Luigi哪个需要的任务在哪个文件中的最佳方法是什么?似乎这种方法会变得很麻烦.

小智 0

为什么会变得麻烦呢?如果您的export_task_A依赖于许多任务,则您的def要求将更改为:

def requires(self):
    return [import_task_A(), import_task_B()]
Run Code Online (Sandbox Code Playgroud)

顺便说一句,在这种情况下你可能想要删除

if __name__ == '__main__':
    luigi.run()
Run Code Online (Sandbox Code Playgroud)

来自您的 data_import.py。也不要在 data_export.py 中使用相同的内容

if __name__ == '__main__':
    luigi.build([export_task_A()])
Run Code Online (Sandbox Code Playgroud)