wim*_*wim 289 python multithreading
如何获取foo
从线程目标返回的值?
from threading import Thread
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()
Run Code Online (Sandbox Code Playgroud)
如上所示,"一种显而易见的方法"不起作用:'foo'
返回'foo'
.
Jak*_*ger 262
FWIW,该multiprocessing
模块使用Pool
该类有一个很好的接口.如果你想坚持使用线程而不是进程,你可以使用multiprocessing.pool.ThreadPool
该类作为替代品.
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the main process
return_val = async_result.get() # get the return value from your function.
Run Code Online (Sandbox Code Playgroud)
kin*_*all 216
我看到的一种方法是将一个可变对象(如列表或字典)传递给线程的构造函数,以及某种索引或其他标识符.然后,线程可以将其结果存储在该对象的专用槽中.例如:
def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# do some other stuff
for i in range(len(threads)):
threads[i].join()
print " ".join(results) # what sound does a metasyntactic locomotive make?
Run Code Online (Sandbox Code Playgroud)
如果您确实想要join()
返回被调用函数的返回值,可以使用如下的Thread
子类来执行此操作:
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print twrv.join() # prints foo
Run Code Online (Sandbox Code Playgroud)
由于某些名称损坏,它变得有点毛茸茸,并且它访问特定于Thread
实现的"私有"数据结构......但是它有效.
对于python3
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
def run(self):
print(type(self._target))
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return
Run Code Online (Sandbox Code Playgroud)
小智 174
在 Python 3.2+ 中,stdlibconcurrent.futures
模块为 提供了更高级别的 API threading
,包括将返回值或异常从工作线程传递回主线程:
import concurrent.futures
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(foo, 'world!')
return_value = future.result()
print(return_value)
Run Code Online (Sandbox Code Playgroud)
bj0*_*bj0 81
Jake的答案很好,但是如果你不想使用线程池(你不知道你需要多少线程,但是根据需要创建它们)那么在线程之间传输信息的好方法就是内置Queue.Queue类,因为它提供线程安全性.
我创建了以下装饰器,使其以与threadpool类似的方式运行:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
Run Code Online (Sandbox Code Playgroud)
然后你只需使用它:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
Run Code Online (Sandbox Code Playgroud)
每次调用时,修饰函数都会创建一个新线程,并返回一个包含将接收结果的队列的Thread对象.
UPDATE
自从我发布这个答案以来已经有一段时间了,但它仍然会得到视图,所以我想我会更新它以反映我在新版本的Python中执行此操作的方式:
Python 3.2添加在concurrent.futures
模块中,为并行任务提供高级接口.它提供了ThreadPoolExecutor
和ProcessPoolExecutor
,所以你可以使用一个线程或进程池相同的API.
这个api的一个好处是提交一个任务Executor
返回一个Future
对象,该对象将完成你提交的可调用的返回值.
这使得queue
不需要附加对象,这简化了装饰器:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
Run Code Online (Sandbox Code Playgroud)
如果未传入,则将使用默认模块 threadpool执行程序.
用法与以前非常相似:
@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Future object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result()
print result
Run Code Online (Sandbox Code Playgroud)
如果您使用的是Python 3.4+,那么使用此方法(以及一般的Future对象)的一个非常好的功能是返回的未来可以被包装以将其转换为asyncio.Future
with asyncio.wrap_future
.这使得协同程序很容易使用:
result = await asyncio.wrap_future(long_task(10))
Run Code Online (Sandbox Code Playgroud)
如果您不需要访问底层concurrent.Future
对象,可以在装饰器中包含wrap:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))
return wrap
Run Code Online (Sandbox Code Playgroud)
然后,每当你需要从事件循环线程中推送cpu密集或阻塞代码时,你可以将它放在一个修饰函数中:
@threadpool
def some_long_calculation():
...
# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
Run Code Online (Sandbox Code Playgroud)
Ari*_*rik 39
另一种不需要更改现有代码的解决方案:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result
Run Code Online (Sandbox Code Playgroud)
它也可以轻松调整到多线程环境:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
threads_list = list()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)
# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...
# Join all the threads
for t in threads_list:
t.join()
# Check thread's return value
while not que.empty():
result = que.get()
print result
Run Code Online (Sandbox Code Playgroud)
Guy*_*oft 21
Parris/kindall的答案 join
/return
答案移植到Python 3:
from threading import Thread
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print(twrv.join()) # prints foo
Run Code Online (Sandbox Code Playgroud)
注意,Thread
该类在Python 3中的实现方式不同.
use*_*679 19
我偷走了所有人的答案并将其清理干净了一点点.
关键部分是添加*args和**kwargs到join()以处理超时
class threadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(threadWithReturn, self).__init__(*args, **kwargs)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
def join(self, *args, **kwargs):
super(threadWithReturn, self).join(*args, **kwargs)
return self._return
Run Code Online (Sandbox Code Playgroud)
更新后的答案
这是我最受欢迎的答案,因此我决定使用将在py2和py3上运行的代码进行更新.
另外,我看到这个问题的很多答案表明对Thread.join()缺乏理解.有些完全无法处理timeout
arg.但是,当你有(1)可以返回的目标函数None
和(2)你也将timeout
arg 传递给join()时,你应该知道一个角落情况.请参阅"测试4"以了解此角落情况.
与py2和py3一起使用的ThreadWithReturn类:
import sys
from threading import Thread
from builtins import super # https://stackoverflow.com/a/30159479
if sys.version_info >= (3, 0):
_thread_target_key = '_target'
_thread_args_key = '_args'
_thread_kwargs_key = '_kwargs'
else:
_thread_target_key = '_Thread__target'
_thread_args_key = '_Thread__args'
_thread_kwargs_key = '_Thread__kwargs'
class ThreadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return = None
def run(self):
target = getattr(self, _thread_target_key)
if not target is None:
self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key))
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self._return
Run Code Online (Sandbox Code Playgroud)
一些样本测试如下所示:
import time, random
# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
if not seconds is None:
time.sleep(seconds)
return arg
# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')
# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)
# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
Run Code Online (Sandbox Code Playgroud)
你能否确定我们可能遇到的TEST 4角落?
问题是我们希望giveMe()返回None(参见TEST 2),但我们也期望join()在超时时返回None.
returned is None
意味着:
(1)这就是giveMe()返回的,或者
(2)加入()超时
这个例子很简单,因为我们知道giveMe()将始终返回None.但是在现实世界中(目标可以合法地返回None或其他东西)我们想要明确地检查发生了什么.
以下是如何解决这个角落的问题:
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
if my_thread.isAlive():
# returned is None because join() timed out
# this also means that giveMe() is still running in the background
pass
# handle this based on your app's logic
else:
# join() is finished, and so is giveMe()
# BUT we could also be in a race condition, so we need to update returned, just in case
returned = my_thread.join()
Run Code Online (Sandbox Code Playgroud)
slo*_*ady 18
我发现的大多数答案都很长,需要熟悉其他模块或高级 python 功能,除非他们已经熟悉答案所涉及的所有内容,否则会让某人感到困惑。
简化方法的工作代码:
import threading
class ThreadWithResult(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
def function():
self.result = target(*args, **kwargs)
super().__init__(group=group, target=function, name=name, daemon=daemon)
Run Code Online (Sandbox Code Playgroud)
示例代码:
import time, random
def function_to_thread(n):
count = 0
while count < 3:
print(f'still running thread {n}')
count +=1
time.sleep(3)
result = random.random()
print(f'Return value of thread {n} should be: {result}')
return result
def main():
thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(thread1.result)
print(thread2.result)
main()
Run Code Online (Sandbox Code Playgroud)
说明:
我想显着简化事情,所以我创建了一个ThreadWithResult
类并让它继承自threading.Thread
. 嵌套函数function
in__init__
调用我们要保存其值的线程函数,并self.result
在线程执行完毕后将该嵌套函数的结果保存为实例属性。
创建 this 的实例与创建 的实例相同threading.Thread
。将要在新线程上运行的函数传递给target
参数,将函数可能需要的任何参数传递给args
参数,并将任何关键字参数传递给kwargs
参数。
例如
my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))
Run Code Online (Sandbox Code Playgroud)
我认为这比绝大多数答案更容易理解,而且这种方法不需要额外的导入!我包含了time
和random
模块来模拟线程的行为,但不需要它们来实现原始问题中提出的功能。
我知道我是在提出问题后回答这个问题的,但我希望这可以在未来帮助更多人!
编辑:我创建了save-thread-result
PyPI 包,以允许您访问上面相同的代码并在项目中重用它(GitHub 代码在这里)。PyPI 包完全扩展了threading.Thread
类,因此您也可以设置要threading.thread
在ThreadWithResult
类上设置的任何属性!
上面的原始答案涵盖了该子类背后的主要思想,但有关更多信息,请参阅此处的更详细说明(来自模块文档字符串)。
快速使用示例:
pip3 install -U save-thread-result # MacOS/Linux
pip install -U save-thread-result # Windows
python3 # MacOS/Linux
python # Windows
Run Code Online (Sandbox Code Playgroud)
from save_thread_result import ThreadWithResult
# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
target = my_function,
args = (my_function_arg1, my_function_arg2, ...)
kwargs = {my_function_kwarg1: kwarg1_value, my_function_kwarg2: kwarg2_value, ...}
)
thread.start()
thread.join()
if getattr(thread, 'result', None):
print(thread.result)
else:
# thread.result attribute not set - something caused
# the thread to terminate BEFORE the thread finished
# executing the function passed in through the
# `target` argument
print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')
# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)
Run Code Online (Sandbox Code Playgroud)
小智 12
使用队列:
import threading, queue
def calc_square(num, out_queue1):
l = []
for x in num:
l.append(x*x)
out_queue1.put(l)
arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
Run Code Online (Sandbox Code Playgroud)
小智 9
我发现执行此操作的最短、最简单的方法是利用 Python 类及其动态属性。您可以使用 来从生成的线程的上下文中检索当前线程threading.current_thread()
,并将返回值分配给属性。
import threading
def some_target_function():
# Your code here.
threading.current_thread().return_value = "Some return value."
your_thread = threading.Thread(target=some_target_function)
your_thread.start()
your_thread.join()
return_value = your_thread.return_value
print(return_value)
Run Code Online (Sandbox Code Playgroud)
考虑到@iman对@JakeBiesinger答案的评论,我将其重新组合为具有不同数量的线程:
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
numOfThreads = 3
results = []
pool = ThreadPool(numOfThreads)
for i in range(0, numOfThreads):
results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)
# do some other stuff in the main process
# ...
# ...
results = [r.get() for r in results]
print results
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
我解决这个问题的方法是将函数和线程包装在一个类中.不需要使用池,队列或c类型变量传递.它也是非阻塞的.你检查状态.请参阅代码末尾如何使用它的示例.
import threading
class ThreadWorker():
'''
The basic idea is given a function create an object.
The object can then run the function in a thread.
It provides a wrapper to start it,check its status,and get data out the function.
'''
def __init__(self,func):
self.thread = None
self.data = None
self.func = self.save_data(func)
def save_data(self,func):
'''modify function to save its returned data'''
def new_func(*args, **kwargs):
self.data=func(*args, **kwargs)
return new_func
def start(self,params):
self.data = None
if self.thread is not None:
if self.thread.isAlive():
return 'running' #could raise exception here
#unless thread exists and is alive start or restart it
self.thread = threading.Thread(target=self.func,args=params)
self.thread.start()
return 'started'
def status(self):
if self.thread is None:
return 'not_started'
else:
if self.thread.isAlive():
return 'running'
else:
return 'finished'
def get_results(self):
if self.thread is None:
return 'not_started' #could return exception
else:
if self.thread.isAlive():
return 'running'
else:
return self.data
def add(x,y):
return x +y
add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
Run Code Online (Sandbox Code Playgroud)
根据所提到的内容,这是适用于 Python3 的更通用的解决方案。
import threading
class ThreadWithReturnValue(threading.Thread):
def __init__(self, *init_args, **init_kwargs):
threading.Thread.__init__(self, *init_args, **init_kwargs)
self._return = None
def run(self):
self._return = self._target(*self._args, **self._kwargs)
def join(self):
threading.Thread.join(self)
return self._return
Run Code Online (Sandbox Code Playgroud)
用法
th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
th.start()
response = th.join()
response.status_code # => 200
Run Code Online (Sandbox Code Playgroud)
我正在使用这个包装器,它可以轻松地将任何函数转换为在 a 中运行Thread
- 处理其返回值或异常。它不会增加Queue
开销。
def threading_func(f):
"""Decorator for running a function in a thread and handling its return
value or exception"""
def start(*args, **kw):
def run():
try:
th.ret = f(*args, **kw)
except:
th.exc = sys.exc_info()
def get(timeout=None):
th.join(timeout)
if th.exc:
raise th.exc[0], th.exc[1], th.exc[2] # py2
##raise th.exc[1] #py3
return th.ret
th = threading.Thread(None, run)
th.exc = None
th.get = get
th.start()
return th
return start
Run Code Online (Sandbox Code Playgroud)
def f(x):
return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))
@threading_func
def th_mul(a, b):
return a * b
th = th_mul("text", 2.5)
try:
print(th.get())
except TypeError:
print("exception thrown ok.")
Run Code Online (Sandbox Code Playgroud)
threading
模块注意事项线程函数的舒适返回值和异常处理是一个频繁的“Pythonic”需求,并且确实应该已经由threading
模块提供- 可能直接在标准Thread
类中。ThreadPool
对于简单的任务有太多的开销 - 3 个管理线程,大量的官僚作风。不幸的是Thread
,它的布局最初是从 Java 复制而来的 — 例如,您可以从仍然无用的第一个 (!) 构造函数参数中看到这一点group
。