我正在研究一个 Luigi 管道,该管道检查是否存在手动创建的文件,如果存在,则继续执行下一个任务:
import luigi, os
class ExternalFileChecker(luigi.ExternalTask):
task_namespace='MyTask'
path = luigi.Parameter()
def output(self):
return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))
class ProcessExternalFile(luigi.Task):
task_namespace='MyTask'
path = luigi.Parameter()
def requires(self):
return ExternalFileChecker(path=self.path)
def output(self):
dirname = self.path
outfile = os.path.join(dirname, 'processedfile.txt')
return luigi.LocalTarget(outfile)
def run(self):
#do processing
if __name__ == '__main__':
path = r'D:\MyPath\luigi'
luigi.run(['MyTask.ProcessExternalFile','--path', path,\
'--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
'--worker-keep-alive'])
Run Code Online (Sandbox Code Playgroud)
我想要的是 luigi 在创建手册文件并将其粘贴到路径中后继续。当我这样做时,它不是查找文件并继续执行任务,而是每隔几秒钟重新检查一次新任务:
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)
Run Code Online (Sandbox Code Playgroud)
经过相当长的时间(15-20 分钟左右)后,luigi 将找到该文件,然后它可以根据需要继续。我能做些什么来防止这种延迟?我希望 luigi 在文件存在后立即继续。
需要牢记以下几点:
keep_alive = True,在这种情况下它会在没有更多挂起的任务时退出)。retry_external_tasks配置设置控制[worker]。我想你正在观察的是这样的。您的管道正在运行,任务ProcessExternalFile失败,然后您添加文件,任务在 的持续时间内保持 FAILED retry_delay,然后最终变为 PENDING 并且工作人员再次获得此任务,此时它发现文件并且任务变为 COMPLETE .
这是否是所需的行为取决于您。如果您希望更快地找到文件,您可以更改重试间隔。或者您可以while在run方法中执行无限循环并定期检查文件,并在找到时跳出循环。您还可以配置 Luigi 以完全禁用重试逻辑。
| 归档时间: |
|
| 查看次数: |
3411 次 |
| 最近记录: |