nic*_*ick 2 python hadoop luigi
第一次进入Luigi(和Python!)的领域,并有一些问题.相关代码是:
from Database import Database
import luigi
class bbSanityCheck(luigi.Task):
conn = luigi.Parameter()
date = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(bbSanityCheck, self).__init__(*args, **kwargs)
self.has_run = False
def run(self):
print "Entering run of bb sanity check"
# DB STUFF HERE THAT DOESN"T MATTER
print "Are we in la-la land?"
def complete(self):
print "BB Sanity check being asked for completeness: " , self.has_run
return self.has_run
class Pipeline(luigi.Task):
date = luigi.DateParameter()
def requires(self):
db = Database('cbs')
self.conn = db.connect()
print "I'm about to yield!"
return bbSanityCheck(conn = self.conn, date = self.date)
def run(self):
print "Hello World"
self.conn.query("""SELECT *
FROM log_blackbook""")
result = conn.store_result()
print result.fetch_row()
def complete(self):
return False
if __name__=='__main__':
luigi.run()
Run Code Online (Sandbox Code Playgroud)
输出在这里(删除了相关的DB返回'原因):
DEBUG: Checking if Pipeline(date=2013-03-03) is complete
I'm about to yield!
INFO: Scheduled Pipeline(date=2013-03-03)
I'm about to yield!
DEBUG: Checking if bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03) is complete
BB Sanity check being asked for completeness: False
INFO: Scheduled bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 5150] Running bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
Entering run of bb sanity check
Are we in la-la land?
INFO: [pid 5150] Done bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: Worker was stopped. Shutting down Keep-Alive thread
Run Code Online (Sandbox Code Playgroud)
所以问题:
1.)为什么"我即将屈服"被打印两次?
2.)为什么"hello world"从未打印过?
3.)"可能由其他工人管理的1个待处理任务"是什么?
我更喜欢超超清洁输出,因为它更容易维护.我希望我能把这些警告等同起来.
我还注意到需要"yield"或"return item,item2,item3".我已经阅读了关于产量并了解它的信息.我没有得到的是哪种惯例在这里被认为是优越的,或者如果它们是微妙的差异,我不熟悉这种语言.
我认为你误解了路易吉的工作原理.
(1)嗯..不确定.看起来更像是在INFO和DEBUG中向我打印相同内容的问题
(2)所以,你试图运行依赖于bbSanityCheck运行的Pipeline.bbSanityCheck.complete()永远不会返回True,因为你从未在bbSanityCheck中将has_run设置为True.因此,Pipeline任务永远不会运行并输出hello world,因为它的依赖关系永远不会完成.
(3)这可能是因为你有这个待处理的任务(它实际上是管道).但是Luigi明白它不可能运行和关闭.
我个人不会使用has_run来检查任务是否已运行,而是检查是否存在此作业的结果.即,如果这个工作对数据库有效,那么,complete()应该检查预期的内容是否存在.