Pat*_*ins 36 python multithreading python-multithreading python-3.x concurrent.futures
我对我编写的一些代码感到非常困惑.我惊讶地发现:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(f, iterable))
Run Code Online (Sandbox Code Playgroud)
和
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(map(lambda x: executor.submit(f, x), iterable))
Run Code Online (Sandbox Code Playgroud)
产生不同的结果.第一个产生任何类型f返回的列表,第二个产生一个concurrent.futures.Future对象列表,然后需要使用它们的result()方法进行评估,以获得f返回的值.
我主要担心的是,这意味着executor.map无法利用concurrent.futures.as_completed,这似乎是一种非常方便的方法来评估我正在进行的数据库长期运行调用的结果.
关于concurrent.futures.ThreadPoolExecutor对象是如何工作的我一点都不清楚- 天真地,我更喜欢(稍微冗长一点):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
results = [f.result() for f in futures.as_completed(result_futures)]
Run Code Online (Sandbox Code Playgroud)
executor.map为了利用可能的性能提升,更简洁.我错了吗?
Kri*_*itz 23
问题是您将结果ThreadPoolExecutor.map转换为列表.如果你不这样做而是直接迭代生成的生成器,结果仍然以正确的顺序产生,但循环在所有结果准备好之前继续.您可以使用此示例对此进行测试:
import time
import concurrent.futures
e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
print(i)
Run Code Online (Sandbox Code Playgroud)
保留订单可能是因为有时候你得到的结果与你给他们映射的顺序相同是很重要的.并且结果可能未包含在将来的对象中,因为在某些情况下,如果需要,可能需要花费很长时间才能在列表上执行另一个映射以获取所有结果.毕竟在大多数情况下,在循环处理第一个值之前,下一个值很可能就绪了.这个例子证明了这一点:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []
for value in results:
finals.append(do_some_stuff(value))
Run Code Online (Sandbox Code Playgroud)
在这个例子中,它可能do_some_stuff需要花费更长的时间crunch_number,如果确实如此,那么你仍然可以轻松使用地图,这确实不会造成很大的性能损失.
此外,由于工作线程(/ processes)在列表的开头处开始处理并且按照你提交的列表的结尾工作,结果应该按照它们已经由迭代器产生的顺序完成.在大多数情况下,这意味着executor.map很好,但在某些情况下,例如,如果您处理值的顺序无关紧要,并且您传递的函数map运行的时间非常不同,则future.as_completed可能会更快.
wyn*_*emo 17
如果使用concurrent.futures.as_completed,则可以处理每个函数的异常。
import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]
def f(x):
if x == 2:
raise Exception('x')
return x
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
for future in concurrent.futures.as_completed(result_futures):
try:
print('resutl is', future.result())
except Exception as e:
print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10
Run Code Online (Sandbox Code Playgroud)
in executor.map,如果有异常,整个执行器就会停止。您需要在工作函数中处理异常。
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
for each in executor.map(f, iterable):
print(each)
# if there is any exception, executor.map would stop
Run Code Online (Sandbox Code Playgroud)
以下是提交与地图的示例。他们俩都立即接受工作(提交-开始)。他们花费相同的时间完成11秒(最后结果时间-开始)。但是,只要ThreadPoolExecutor maxThreads = 2中的任何线程完成,submit就会给出结果。地图按提交顺序给出结果。
import time
import concurrent.futures
def worker(i):
time.sleep(i)
return i,time.time()
e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn
f = []
print 'start submit',time.time()
for i in arrIn:
f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
print r.result(),time.time()
print
f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
print r,time.time()
Run Code Online (Sandbox Code Playgroud)
输出:
[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48
start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49
Run Code Online (Sandbox Code Playgroud)
In addition to the explanation in the answers here, it can be helpful to go right to the source. It reaffirms the statement from another answer here that:
.map() gives results in the order they are submitted, whileFuture objects with concurrent.futures.as_completed() won't guarantee this ordering, because this is the nature of as_completed().map() is defined in the base class, concurrent.futures._base.Executor:
class Executor(object):
def submit(self, fn, *args, **kwargs):
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None, chunksize=1):
if timeout is not None:
end_time = timeout + time.monotonic()
fs = [self.submit(fn, *args) for args in zip(*iterables)] # <!!!!!!!!
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse() # <!!!!!!!!
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result() # <!!!!!!!!
else:
yield fs.pop().result(end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()
Run Code Online (Sandbox Code Playgroud)
As you mention, there is also .submit(), which left to be defined in the child classes, namely ProcessPoolExecutor and ThreadPoolExecutor, and returns a _base.Future instance that you need to call .result() on to actually make do anything.
The important lines from .map() boil down to:
fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
yield fs.pop().result()
Run Code Online (Sandbox Code Playgroud)
在.reverse()加上.pop()是让(从第一次提交的结果的一种手段iterables),以先产生,第二提交的结果进行第二屈服了,等等。结果迭代器的元素不是Futures;它们本身就是实际结果。
| 归档时间: |
|
| 查看次数: |
19652 次 |
| 最近记录: |