Vel*_*ron 10 python python-3.x luigi data-pipeline
我是luigi的新手,在为我们的ML工作设计管道时遇到过它.虽然它不适合我的特定用例,但它有很多额外的功能,我决定让它适合.
基本上我正在寻找的是一种能够持久保存自定义构建管道并因此使其结果可重复且易于部署的方法,在阅读了大多数在线教程之后,我尝试使用现有luigi.cfg配置和命令行机制实现我的序列化并且它可能已经足够用于任务的参数但它没有提供序列化我的管道的DAG连接的方法,所以我决定有一个WrapperTask接收到一个json config file然后创建所有任务实例并连接所有输入输出通道luigi任务(做所有的管道).
我特此附上一个小测试程序供您审查:
import random
import luigi
import time
import os
class TaskNode(luigi.Task):
i = luigi.IntParameter() # node ID
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.required = []
def set_required(self, required=None):
self.required = required # set the dependencies
return self
def requires(self):
return self.required
def output(self):
return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))
def run(self):
with self.output().open('w') as outfile:
outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
self.process()
def process(self):
raise NotImplementedError(self.__class__.__name__ + " must implement this method")
class FastNode(TaskNode):
def process(self):
time.sleep(1)
class SlowNode(TaskNode):
def process(self):
time.sleep(2)
# This WrapperTask builds all the nodes
class All(luigi.WrapperTask):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
num_nodes = 513
classes = TaskNode.__subclasses__()
self.nodes = []
for i in reversed(range(num_nodes)):
cls = random.choice(classes)
dependencies = random.sample(self.nodes, (num_nodes - i) // 35)
obj = cls(i=i)
if dependencies:
obj.set_required(required=dependencies)
else:
obj.set_required(required=None)
# delete existing output causing a build all
if obj.output().exists():
obj.output().remove()
self.nodes.append(obj)
def requires(self):
return self.nodes
if __name__ == '__main__':
luigi.run()
Run Code Online (Sandbox Code Playgroud)
所以,基本上,在问题的标题说,这侧重于动态依赖,并产生a 513 node dependency DAG与p=1/35 connectivity probability,它还类定义了所有的(如在使所有),因为这需要所有节点要建它被认为是做了WrapperTask (我有一个版本只将它连接到连接的DAG组件的头部,但我不想过于复杂).
是否有更标准(Luigic)的实现方式?特别注意与TaskNode init和set_required方法并不那么复杂,我只是这样做,因为在init方法中接收参数会以某种方式与luigi注册参数的方式发生冲突.我还尝试了其他几种方法,但这基本上是最体面的(有效)
如果没有一种标准的方式,我仍然希望在完成框架实施之前,能够听到我计划进行的任何见解.
| 归档时间: |
|
| 查看次数: |
458 次 |
| 最近记录: |