路易吉的任务在哪里?

nic*_*ick 2 python hadoop luigi

第一次进入Luigi(和Python!)的领域,并有一些问题.相关代码是:

from Database import Database
import luigi

class bbSanityCheck(luigi.Task):

  conn = luigi.Parameter()
  date = luigi.Parameter()
  def __init__(self, *args, **kwargs):
    super(bbSanityCheck, self).__init__(*args, **kwargs)
    self.has_run = False

  def run(self):
    print "Entering run of bb sanity check"
    # DB STUFF HERE THAT DOESN"T MATTER
   print "Are we in la-la land?"

  def complete(self):
    print "BB Sanity check being asked for completeness: " , self.has_run
    return self.has_run

class Pipeline(luigi.Task):
  date = luigi.DateParameter()

  def requires(self):
    db = Database('cbs')
    self.conn = db.connect()
    print "I'm about to yield!"
    return bbSanityCheck(conn = self.conn, date = self.date)


  def run(self):
    print "Hello World"
    self.conn.query("""SELECT * 
              FROM log_blackbook""")
    result = conn.store_result()

    print result.fetch_row()

  def complete(self):
    return False

if __name__=='__main__':
  luigi.run()
Run Code Online (Sandbox Code Playgroud)

输出在这里(删除了相关的DB返回'原因):

DEBUG: Checking if Pipeline(date=2013-03-03) is complete
I'm about to yield!
INFO: Scheduled Pipeline(date=2013-03-03)
I'm about to yield!
DEBUG: Checking if bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03) is complete
BB Sanity check being asked for completeness:  False
INFO: Scheduled bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 5150] Running   bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
Entering run of bb sanity check
Are we in la-la land?
INFO: [pid 5150] Done      bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: Worker was stopped. Shutting down Keep-Alive thread
Run Code Online (Sandbox Code Playgroud)

所以问题:

1.)为什么"我即将屈服"被打印两次?

2.)为什么"hello world"从未打印过?

3.)"可能由其他工人管理的1个待处理任务"是什么?

我更喜欢超超清洁输出,因为它更容易维护.我希望我能把这些警告等同起来.

我还注意到需要"yield"或"return item,item2,item3".我已经阅读了关于产量并了解它的信息.我没有得到的是哪种惯例在这里被认为是优越的,或者如果它们是微妙的差异,我不熟悉这种语言.

gra*_*per 5

我认为你误解了路易吉的工作原理.

(1)嗯..不确定.看起来更像是在INFO和DEBUG中向我打印相同内容的问题

(2)所以,你试图运行依赖于bbSanityCheck运行的Pipeline.bbSanityCheck.complete()永远不会返回True,因为你从未在bbSanityCheck中将has_run设置为True.因此,Pipeline任务永远不会运行并输出hello world,因为它的依赖关系永远不会完成.

(3)这可能是因为你有这个待处理的任务(它实际上是管道).但是Luigi明白它不可能运行和关闭.

我个人不会使用has_run来检查任务是否已运行,而是检查是否存在此作业的结果.即,如果这个工作对数据库有效,那么,complete()应该检查预期的内容是否存在.