0 parallel-processing function build luigi
我正在通过luigi.build函数尝试luigi多处理功能.但是我在执行时遇到了一些库错误.
self._add(item,is_complete)中的下一个:文件"/home/manoj/anaconda2/lib/python2.7/site-packages/luigi/worker.py",第604行,在_add self._validate_dependency(d)文件中"/home/manoj/anaconda2/lib/python2.7/site-packages/luigi/worker.py",第622行,在_validate_dependency中引发Exception('requires()必须返回Task对象')
这是我试图实现给定目标的一段代码.
import luigi
class TaskOne(luigi.Task):
custid= luigi.Parameter()
def requires(self):
pass
def output(self):
return luigi.LocalTarget("logs/"+str(self.custid)+"_success")
def run(self):
with self.output().open('w') as f:
f.write("%s\n" % '')
class TaskTwo(luigi.Task):
def requires(self):
customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
yield luigi.build([TaskOne(custid=cust_id) for cust_id in customersList], workers=2)
def output(self):
return luigi.LocalTarget("logs/overall_success.txt")
def run(self):
with self.output().open('w') as f:
f.write("%s\n" % "success")
if __name__ == '__main__':
luigi.run()Run Code Online (Sandbox Code Playgroud)
================================================== ======================
为什么你认为你需要内置需求?
class TaskTwo(luigi.Task):
def requires(self):
customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
return [TaskOne(custid=cust_id) for cust_id in customersList]
Run Code Online (Sandbox Code Playgroud)
如果需要多个worker,可以在启动管道时在命令行中指定.
luigi --module your_module TaskTwo --workers 2