Luigi:通过构建函数对Parallazing luigi任务进行错误

0 parallel-processing function build luigi

我正在通过luigi.build函数尝试luigi多处理功能.但是我在执行时遇到了一些库错误.

self._add(item,is_complete)中的下一个:文件"/home/manoj/anaconda2/lib/python2.7/site-packages/luigi/worker.py",第604行,在_add self._validate_dependency(d)文件中"/home/manoj/anaconda2/lib/python2.7/site-packages/luigi/worker.py",第622行,在_validate_dependency中引发Exception('requires()必须返回Task对象')

这是我试图实现给定目标的一段代码.

import luigi

class TaskOne(luigi.Task):
    custid= luigi.Parameter() 
    def requires(self):
        pass
    def output(self):
        return luigi.LocalTarget("logs/"+str(self.custid)+"_success")
    def run(self):
        with self.output().open('w') as f:
            f.write("%s\n" % '')
        
               
class TaskTwo(luigi.Task):  
    def requires(self): 
        customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
        yield luigi.build([TaskOne(custid=cust_id) for cust_id in customersList], workers=2)  
    def output(self):
        return luigi.LocalTarget("logs/overall_success.txt")
    def run(self):
        with self.output().open('w') as f:
            f.write("%s\n" % "success")
         
if __name__ == '__main__':
    luigi.run()
Run Code Online (Sandbox Code Playgroud)

================================================== ======================

Mat*_*ght 5

为什么你认为你需要内置需求?

class TaskTwo(luigi.Task):  
  def requires(self): 
    customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
    return [TaskOne(custid=cust_id) for cust_id in customersList]
Run Code Online (Sandbox Code Playgroud)

如果需要多个worker,可以在启动管道时在命令行中指定.

luigi --module your_module TaskTwo --workers 2