小编Joh*_*han的帖子

Python Luigi - 满意后继续执行外部任务

我正在研究一个 Luigi 管道,该管道检查是否存在手动创建的文件,如果存在,则继续执行下一个任务:

import luigi, os

class ExternalFileChecker(luigi.ExternalTask):
    task_namespace='MyTask'
    path = luigi.Parameter()
    def output(self):
        return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))

 class ProcessExternalFile(luigi.Task):
      task_namespace='MyTask'
      path = luigi.Parameter()

      def requires(self):
          return ExternalFileChecker(path=self.path)

      def output(self):
          dirname = self.path
          outfile = os.path.join(dirname, 'processedfile.txt')
          return luigi.LocalTarget(outfile)

      def run(self):
          #do processing

if __name__ == '__main__':
      path = r'D:\MyPath\luigi'
      luigi.run(['MyTask.ProcessExternalFile','--path', path,\
      '--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
      '--worker-keep-alive'])
Run Code Online (Sandbox Code Playgroud)

我想要的是 luigi 在创建手册文件并将其粘贴到路径中后继续。当我这样做时,它不是查找文件并继续执行任务,而是每隔几秒钟重新检查一次新任务:

DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers …
Run Code Online (Sandbox Code Playgroud)

python luigi

5
推荐指数
1
解决办法
3411
查看次数

标签 统计

luigi ×1

python ×1