如何使用 python 调度模块与共享作业队列并行执行作业时传递参数

J.W*_*ang 2 python queue scheduler scheduled-tasks python-2.7

我打算并行运行多个作业,并按照此处的'NoneType' object is not callable示例使用作业队列,但它执行一次并在我尝试传递参数时引发异常。下面列出了代码:

import Queue
import schedule
import threading
import time

def job(arg):
    print 'Argument is %s' % arg

def worker_main():
    while True:
        try:
            job_func = jobqueue.get()
            job_func()
        except Exception as e:
            print e

jobqueue = Queue.Queue()

schedule.every(2).seconds.do(jobqueue.put, job(1))
schedule.every(2).seconds.do(jobqueue.put, job(2))

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while True:
    try:
        schedule.run_pending()
        time.sleep(1)
    except Exception as e:
        print e
        sys.exit()
Run Code Online (Sandbox Code Playgroud)

输出是:

Arg is 1
Arg is 2
'NoneType' object is not callable
'NoneType' object is not callable
'NoneType' object is not callable
'NoneType' object is not callable
'NoneType' object is not callable
'NoneType' object is not callable
...
Run Code Online (Sandbox Code Playgroud)

有什么想法可以解决这个问题吗?

far*_*zad 6

原因是传递给do方法 in 的参数schedule.every(2).seconds.do(jobqueue.put, job(1))实际上是None

因为代码正在调用job函数并将 1(和 2)作为参数传递给job. 因此,函数的返回值job(因为None它只是打印)作为方法调用的第二个参数传递do。因此,None作业队列中存储的是实例,而不是函数引用。

将参数传递给作业的问题在于,包中do的方法schedule可以接受作业运行的额外参数,但正在计划的是将作业放入队列中,队列项只是函数引用,没有额外的参数论据。

一种解决方案是将作业与其参数一起放入队列。然后,工作人员需要获取它们并通过向其传递参数来调用该作业。像这样的东西:

import Queue
import schedule
import threading
import time

def job(arg):
    print 'Argument is %s' % arg

def worker_main():
    while True:
        try:
            job_func, job_args = jobqueue.get()
            job_func(*job_args)
        except Exception as e:
            print e

jobqueue = Queue.Queue()

schedule.every(2).seconds.do(jobqueue.put, (job, [1]))
schedule.every(2).seconds.do(jobqueue.put, (job, [2]))

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while True:
    try:
        schedule.run_pending()
        time.sleep(1)
    except Exception as e:
        print e
        sys.exit()
Run Code Online (Sandbox Code Playgroud)

在这里,我们将作业函数引用的元组和参数列表放入队列中。然后工作人员会获取它们,并将参数列表传递给工作函数。

另一个解决方案是将作业(job(1)job(2)调用)包装在不需要参数的其他函数中,然后将这些函数注册到作业队列,如下所示:

def job1():
    job(1)

def job2():
    job(2)

jobqueue = Queue.Queue()

schedule.every(2).seconds.do(jobqueue.put, job1)
schedule.every(2).seconds.do(jobqueue.put, job2)
Run Code Online (Sandbox Code Playgroud)