我使用Spotify的Luigi在Python 3.6中编写了我的第一个项目,以便在管道中安排一些自然语言处理任务.
我注意到类的output()功能Task总是返回某种Target对象,这只是某个文件,无论是本地还是远程.因为我的任务产生了更复杂的数据结构,比如解析树,所以将它们作为字符串写入文件并在之后再次读取它是非常尴尬的.
因此,我想询问是否有可能在管道中的任务之间传递Python对象?
我在路易吉建立了一个任务管道.由于此管道将在不同的上下文中使用,因此可能需要在管道的开头或末尾包含更多任务,或者甚至在任务之间包含完全不同的依赖关系.
就在那时我想:"嘿,为什么要在我的配置文件中声明任务之间的依赖关系?",所以我在config.py中添加了这样的东西:
PIPELINE_DEPENDENCIES = {
"TaskA": [],
"TaskB": ["TaskA"],
"TaskC": ["TaskA"],
"TaskD": ["TaskB", "TaskC"]
}
Run Code Online (Sandbox Code Playgroud)
我在整个任务中堆积参数时感到很恼火,所以在某些时候我只引入了一个参数,task_config每个参数都存储了Task所需的每个信息或数据run().所以我把它放在PIPELINE_DEPENDENCIES那里.
最后,我将每个Task我定义的继承自两个luigi.Task和一个自定义的Mixin类,它将实现动态requires(),看起来像这样:
class TaskRequirementsFromConfigMixin(object):
task_config = luigi.DictParameter()
def requires(self):
required_tasks = self.task_config["PIPELINE_DEPENDENCIES"]
requirements = [
self._get_task_cls_from_str(required_task)(task_config=self.task_config)
for required_task in required_tasks
]
return requirements
def _get_task_cls_from_str(self, cls_str):
...
Run Code Online (Sandbox Code Playgroud)
不幸的是,这不起作用,因为运行管道给了我以下内容:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 were left pending, among these:
* 4 was not granted run …Run Code Online (Sandbox Code Playgroud) 在我尝试了一段时间不成功后,我正在向这个神奇的网站寻求帮助。现在我的问题:我想创建一个装饰器,将函数的执行时间(在函数执行期间)写入日志文件,如:
@log_time("log.txt", 35)
def some_function(...):
...
return result
Run Code Online (Sandbox Code Playgroud)
和
from functools import wraps
def log_time(path_to_logfile, interval):
...
Run Code Online (Sandbox Code Playgroud)
所以这log.txt看起来像
Time elapsed: 0h 0m 35s
Time elapsed: 0h 1m 10s
Time elapsed: 0h 1m 45s
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?
python ×3
luigi ×2
docker ×1
functools ×1
logging ×1
python-2.7 ×1
python-3.6 ×1
python-3.x ×1