har*_*jay 6 python queue multithreading
我有一个函数接受大量的x,y对作为输入,使用numpy和scipy做一些精细的曲线拟合,然后返回一个值.为了尝试加快速度,我尝试使用Queue.Queue将数据提供给两个线程.数据完成后.我试图让线程终止,然后结束调用进程并将控制权返回给shell.
我试图理解为什么我必须在threading中使用私有方法.线程停止我的线程并将控制返回到命令行.
self.join()不会结束程序.获得控制权的唯一方法是使用私有停止方法.
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
# self.join(timeout=None) does not work
self._Thread__stop()
Run Code Online (Sandbox Code Playgroud)
这是我的代码的近似值:
class CalcThread(threading.Thread):
def __init__(self,in_queue,out_queue,function):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
self.function = function
self.finished = threading.Event()
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
self._Thread__stop()
def run(self):
while not self.finished.isSet():
params_for_function = self.in_queue.get()
try:
tm = self.function(paramsforfunction)
self.in_queue.task_done()
self.out_queue.put(tm)
except ValueError as v:
#modify params and reinsert into queue
window = params_for_function["window"]
params_for_function["window"] = window + 1
self.in_queue.put(params_for_function)
def big_calculation(well_id,window,data_arrays):
# do some analysis to calculate tm
return tm
if __name__ == "__main__":
NUM_THREADS = 2
workers = []
in_queue = Queue()
out_queue = Queue()
for i in range(NUM_THREADS):
w = CalcThread(in_queue,out_queue,big_calculation)
w.start()
workers.append(w)
if options.analyze_all:
for i in well_ids:
in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))
in_queue.join()
print "ALL THREADS SEEM TO BE DONE"
# gather data and report it from out_queue
for i in well_ids:
p = out_queue.get()
print p
out_queue.task_done()
# I had to do this to get the out_queue to proceed
if out_queue.qsize() == 0:
out_queue.join()
break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method
for aworker in workers:
aworker.stop()
Run Code Online (Sandbox Code Playgroud)
通常,杀死修改共享资源的线程是个坏主意.
除非在执行计算时释放GIL,否则多线程中的CPU密集型任务比Python中的无用任务更糟糕.许多numpy
功能都会释放GIL.
import concurrent.futures # on Python 2.x: pip install futures
calc_args = []
if options.analyze_all:
calc_args.extend(dict(well_id=i,...) for i in well_ids)
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
future_to_args = dict((executor.submit(big_calculation, args), args)
for args in calc_args)
while future_to_args:
for future in concurrent.futures.as_completed(dict(**future_to_args)):
args = future_to_args.pop(future)
if future.exception() is not None:
print('%r generated an exception: %s' % (args,
future.exception()))
if isinstance(future.exception(), ValueError):
#modify params and resubmit
args["window"] += 1
future_to_args[executor.submit(big_calculation, args)] = args
else:
print('f%r returned %r' % (args, future.result()))
print("ALL work SEEMs TO BE DONE")
Run Code Online (Sandbox Code Playgroud)
你可以替换ThreadPoolExecutor
的ProcessPoolExecutor
,如果没有共享的状态.将代码放入您的main()
函数中.
归档时间: |
|
查看次数: |
14935 次 |
最近记录: |