我对我编写的一些代码感到非常困惑.我惊讶地发现:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(f, iterable))
Run Code Online (Sandbox Code Playgroud)
和
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(map(lambda x: executor.submit(f, x), iterable))
Run Code Online (Sandbox Code Playgroud)
产生不同的结果.第一个产生任何类型f返回的列表,第二个产生一个concurrent.futures.Future对象列表,然后需要使用它们的result()方法进行评估,以获得f返回的值.
我主要担心的是,这意味着executor.map无法利用concurrent.futures.as_completed,这似乎是一种非常方便的方法来评估我正在进行的数据库长期运行调用的结果.
关于concurrent.futures.ThreadPoolExecutor对象是如何工作的我一点都不清楚- 天真地,我更喜欢(稍微冗长一点):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
results = [f.result() for f in futures.as_completed(result_futures)]
Run Code Online (Sandbox Code Playgroud)
executor.map为了利用可能的性能提升,更简洁.我错了吗?
python multithreading python-multithreading python-3.x concurrent.futures
我是用新的闪亮试验concurrent.futures在Python 3.2中引入模块,和我注意到,几乎相同的代码,使用泳池从concurrent.futures的方式比使用慢multiprocessing.Pool.
这是使用多处理的版本:
def hard_work(n):
# Real hard work here
pass
if __name__ == '__main__':
from multiprocessing import Pool, cpu_count
try:
workers = cpu_count()
except NotImplementedError:
workers = 1
pool = Pool(processes=workers)
result = pool.map(hard_work, range(100, 1000000))
Run Code Online (Sandbox Code Playgroud)
这是使用concurrent.futures:
def hard_work(n):
# Real hard work here
pass
if __name__ == '__main__':
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import cpu_count
try:
workers = cpu_count()
except NotImplementedError:
workers = 1
pool = ProcessPoolExecutor(max_workers=workers)
result = pool.map(hard_work, …Run Code Online (Sandbox Code Playgroud) python concurrency future multiprocessing concurrent.futures
我正在使用新concurrent.futures模块(也有一个Python 2 backport)来做一些简单的多线程I/O. 我无法理解如何干净地杀死使用此模块启动的任务.
查看以下Python 2/3脚本,它重现了我所看到的行为:
#!/usr/bin/env python
from __future__ import print_function
import concurrent.futures
import time
def control_c_this():
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future1 = executor.submit(wait_a_bit, name="Jack")
future2 = executor.submit(wait_a_bit, name="Jill")
for future in concurrent.futures.as_completed([future1, future2]):
future.result()
print("All done!")
def wait_a_bit(name):
print("{n} is waiting...".format(n=name))
time.sleep(100)
if __name__ == "__main__":
control_c_this()
Run Code Online (Sandbox Code Playgroud)
当这个脚本运行时,似乎无法使用常规的Control-C键盘中断干净地杀死它.我在OS X上运行.
kill从命令行求助于杀死脚本.Control-C被忽略了.我在网上找到的大多数文档都讨论了如何用旧threading模块干净地杀死线程.这似乎都不适用于此.
concurrent.futures模块中提供的所有停止内容(如Executor.shutdown()和Future.cancel())的方法仅在Futures尚未启动或完成时才起作用,这在这种情况下毫无意义.我想立即打断未来.
我的用例很简单:当用户点击Control-C时,脚本应该像任何行为良好的脚本一样立即退出.这就是我想要的.
那么在使用时获得此行为的正确方法是concurrent.futures什么?
澄清这个问题的原因:
使用两个具有相同名称的模块令人困惑.它们代表什么使它们与众不同?
什么任务可以解决另一个不能,反之亦然?
我有一个多线程函数,我想要一个使用状态栏tqdm。有没有一种简单的方法可以显示状态栏ThreadPoolExecutor?正是并行化部分使我感到困惑。
import concurrent.futures
def f(x):
return f**2
my_iter = range(1000000)
def run(f,my_iter):
with concurrent.futures.ThreadPoolExecutor() as executor:
function = list(executor.map(f, my_iter))
return results
run(f, my_iter) # wrap tqdr around this function?
Run Code Online (Sandbox Code Playgroud) 使用concurrent.futures的示例(backport for 2.7):
import concurrent.futures # line 01
def f(x): # line 02
return x * x # line 03
data = [1, 2, 3, None, 5] # line 04
with concurrent.futures.ThreadPoolExecutor(len(data)) as executor: # line 05
futures = [executor.submit(f, n) for n in data] # line 06
for future in futures: # line 07
print(future.result()) # line 08
Run Code Online (Sandbox Code Playgroud)
输出:
1
4
9
Traceback (most recent call last):
File "C:\test.py", line 8, in <module>
print future.result() # line 08
File …Run Code Online (Sandbox Code Playgroud) 当我通过JDK 7时,我发现它java.util.concurrent.RunnableFuture<V>有一个run方法.我想知道在接口中重复相同的run方法签名的重要性是什么时候它已经扩展了Runnable.
package java.util.concurrent;
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
Run Code Online (Sandbox Code Playgroud) 我正在用Python编写应用程序,我需要同时运行一些任务.模块多处理提供类Process,concurrent.futures模块提供ProcessPoolExecutor类.两者似乎都使用多个进程来执行他们的任务,但他们的API却不同.我为什么要使用一个而不是另一个?
我知道在Python 3中添加了concurrent.futures,所以我猜它会更好?
来自https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map
如果 func 调用引发异常,则在从迭代器检索其值时将引发该异常。
以下代码段仅输出第一个例外 (Exeption: 1),然后停止。这是否与上述说法相矛盾?我希望以下内容打印出循环中的所有异常。
def test_func(val):
raise Exception(val)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for r in executor.map(test_func,[1,2,3,4,5]):
try:
print r
except Exception as exc:
print 'generated an exception: %s' % (exc)
Run Code Online (Sandbox Code Playgroud) Python的futures软件包允许我们并行地享受ThreadPoolExecutor和ProcessPoolExecutor完成任务.
但是,对于调试,暂时用虚拟并行替换真正的并行性有用,它在主线程中以串行方式执行任务,而不会产生任何线程或进程.
有没有实现的DummyExecutor?
python ×9
concurrency ×4
future ×2
python-3.x ×2
debugging ×1
java ×1
module ×1
python-2.7 ×1
tqdm ×1