标签: concurrent.futures

Python ThreadPoolExecutor - 保证在与提交的func相同的线程中运行的回调吗?

在ThreadPoolExecutor(TPE)中,回调始终保证在与提交的函数相同的线程中运行吗?

例如,我使用以下代码对此进行了测试.我跑了很多次,它似乎funccallback始终在同一个线程跑.

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
        print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 
Run Code Online (Sandbox Code Playgroud)

但是,当我删除time.sleep(random.random())语句时,它似乎失败了,即至少有几个func函数并且callbacks 没有在同一个线程中运行.

对于我正在处理的项目,回调必须始终与提交的函数在同一个线程上运行,所以我想确保这是由TPE保证的.(而且没有随机睡眠的测试结果似乎令人费解).

我查看了执行程序源代码,看起来我们在运行回调之前没有将线程切换到主线程.但只是想确定一下.

python multithreading callback concurrent.futures

12
推荐指数
1
解决办法
5796
查看次数

多处理队列已满

我正在使用concurrent.futures来实现多处理.我得到一个队列.全错误,这是奇怪的,因为我只分配10个工作.

A_list = [np.random.rand(2000, 2000) for i in range(10)]

with ProcessPoolExecutor() as pool:
    pool.map(np.linalg.svd, A_list)
Run Code Online (Sandbox Code Playgroud)

错误:

Exception in thread Thread-9:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 921, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 869, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 251, in _queue_management_worker
    shutdown_worker()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 209, in shutdown_worker
    call_queue.put_nowait(None)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 131, in put_nowait
    return self.put(obj, False)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 82, in put
    raise Full
queue.Full
Run Code Online (Sandbox Code Playgroud)

python parallel-processing multiprocessing concurrent.futures

12
推荐指数
2
解决办法
6234
查看次数

Celery与ProcessPoolExecutor/ThreadPoolExecutor

我正在创建一个django webserver,允许用户在本地机器上运行一些"可执行文件"并通过网页分析它们的输出.

我之前使用过Celery任务队列,以便在类似的情况下运行"可执行文件".但是,在阅读了Python concurrent.futures之后,我开始怀疑我是否应该使用ThreadPoolExecutor,或者ProcessPoolExecutor(或者ThreadPoolExecutor在a ProcessPoolExecutor:D中)?

谷歌搜索我只能找到一个相关的问题比较芹菜和龙卷风,它转向单独使用龙卷风.

我应该使用Celery还是PoolExecutor我的简单网络服务器,为什么?

python django multiprocessing celery concurrent.futures

12
推荐指数
1
解决办法
1629
查看次数

ThreadPoolExecutor中的worker不是真正的守护进程

我无法弄清楚的是,尽管ThreadPoolExecutor使用守护进程工作者,即使主线程退出,它们仍然会运行.

我可以在python3.6.4中提供一个最小的例子:

import concurrent.futures
import time


def fn():
    while True:
        time.sleep(5)
        print("Hello")


thread_pool = concurrent.futures.ThreadPoolExecutor()
thread_pool.submit(fn)
while True:
    time.sleep(1)
    print("Wow")
Run Code Online (Sandbox Code Playgroud)

主线程和工作线程都是无限循环.所以如果我KeyboardInterrupt用来终止主线程,我希望整个程序也会终止.但实际上工作线程仍在运行,即使它是一个守护程序线程.

ThreadPoolExecutor确认工作线程是守护程序线程的源代码:

t = threading.Thread(target=_worker,
                     args=(weakref.ref(self, weakref_cb),
                           self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
Run Code Online (Sandbox Code Playgroud)

此外,如果我手动创建一个守护程序线程,它就像一个魅力:

from threading import Thread
import time


def fn():
    while True:
        time.sleep(5)
        print("Hello")


thread = Thread(target=fn)
thread.daemon = True
thread.start()
while True:
    time.sleep(1)
    print("Wow")
Run Code Online (Sandbox Code Playgroud)

所以我真的无法弄清楚这种奇怪的行为.

python multithreading daemon concurrent.futures

12
推荐指数
2
解决办法
2384
查看次数

是merge.futures是GIL的药吗?

我只是在寻找这个新的实现,我使用python 2.7,我必须安装,所以如果我使用它,我会忘记CPython上的GIL这个词?

python future gil concurrent.futures

11
推荐指数
1
解决办法
5221
查看次数

ProcessPoolExecutor中的ThreadPoolExecutor

我是期货模块的新手,有一项可以从并行化中受益的任务; 但我似乎无法弄清楚如何设置线程的功能和进程的功能.我很感激任何人都可以帮助解决这个问题.

我正在运行粒子群优化(PSO).在没有详细了解PSO本身的情况下,这里是我的代码的基本布局:

有一个Particle类,有一个getFitness(self)方法(计算一些指标并存储它self.fitness).PSO模拟具有多个粒子实例(对于某些模拟,容易超过10; 100s甚至1000s).
每隔一段时间,我就要计算粒子的适应度.目前,我在for循环中执行此操作:

for p in listOfParticles:
  p.getFitness(args)
Run Code Online (Sandbox Code Playgroud)

但是,我注意到每个粒子的适应度可以彼此独立地计算.这使得该适应度计算成为并行化的主要候选者.的确,我能做到map(lambda p: p.getFitness(args), listOfParticles).

现在,我可以轻松地做到这一点futures.ProcessPoolExecutor:

with futures.ProcessPoolExecutor() as e:
  e.map(lambda p: p.getFitness(args), listOfParticles)
Run Code Online (Sandbox Code Playgroud)

由于调用的副作用p.getFitness存储在每个粒子本身,我不必担心从中返回futures.ProcessPoolExecutor().

到现在为止还挺好.但现在我注意到ProcessPoolExecutor创建了新进程,这意味着它会复制内存,这很慢.我希望能够共享内存 - 所以我应该使用线程.这很好,直到我意识到在每个进程中运行多个线程的几个进程可能会更快,因为多个线程仍然只运行在我的甜蜜的8核机器的一个处理器上.

这就是我遇到麻烦的地方:
根据我见过的例子,ThreadPoolExecutor对a进行操作list.那样做ProcessPoolExecutor.所以我不能做任何迭代ProcessPoolExecutor到农场,ThreadPoolExecutor因为那样ThreadPoolExecutor就会得到一个单一的对象(参见我的尝试,发布在下面).
另一方面,我不能自己切片listOfParticles,因为我想做ThreadPoolExecutor自己的魔术来弄清楚需要多少线程.

所以,最重要的问题(最后):
我应该如何构建我的代码,以便我可以使用进程和线程有效地并行化以下内容:

for p in listOfParticles:
  p.getFitness()
Run Code Online (Sandbox Code Playgroud)

这是我一直在尝试的,但我不敢尝试运行它,因为我知道它不会起作用:

>>> def threadize(func, L, …
Run Code Online (Sandbox Code Playgroud)

python multithreading multiprocessing python-3.3 concurrent.futures

11
推荐指数
1
解决办法
4112
查看次数

来自Executor的concurrent.futures.Future的asyncio yield

我有一个long_task运行大量cpu绑定计算的函数,我希望通过使用新的asyncio框架使其异步.生成的long_task_async函数使用a ProcessPoolExecutor将工作卸载到不同的进程,不受GIL约束.

麻烦的是,由于某种原因,concurrent.futures.Future实例ProcessPoolExecutor.submit从投掷时返回a TypeError.这是设计的吗?那些期货与asyncio.Future班级不兼容吗?什么是解决方法?

我也注意到发电机不是可拣选的,所以提交一个couroutine ProcessPoolExecutor就会失败.这个也有任何清洁的解决方案吗?

import asyncio
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def long_task():
    yield from asyncio.sleep(4)
    return "completed"

@asyncio.coroutine
def long_task_async():
    with ProcessPoolExecutor(1) as ex:
        return (yield from ex.submit(long_task)) #TypeError: 'Future' object is not iterable
                                                 # long_task is a generator, can't be pickled

loop = asyncio.get_event_loop()

@asyncio.coroutine
def main():
    n = yield from long_task_async()
    print( n )

loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-asyncio concurrent.futures

11
推荐指数
2
解决办法
5844
查看次数

使用`concurrent.futures.Future`作为承诺

在Python 文档中,我看到:

concurrent.futures.Future......除了测试之外,不应该直接创建.

我想在我的代码中使用它作为一个承诺,我很惊讶不建议像这样使用它.

我用例:
我有一个单独的线程,该线程读取插座来的数据包,和我有很多回调所根据包含在数据包的一些信息调用.数据包是对消费者请求的响应,所有消费者都使用单一连接.每个使用者都会收到一个promise,并为其添加一些处理程序,这些处理程序在响应到达时会被调用.

所以我不能Executor在这里使用子类,因为我只有一个线程,但我需要创建许多Futures(promises).

Promise是非常普遍的编程技术,我认为这Future是Python的承诺实现.但是如果不建议像承诺一样使用它,那么pythonistas通常用于此目的?

注意

我使用Python 2.7 反向移植的concurrent.futures2.7

python concurrency future promise concurrent.futures

11
推荐指数
1
解决办法
6150
查看次数

在python的concurrent.futures中查找BrokenProcessPool的原因

简而言之

我将BrokenProcessPool代码并行化时遇到异常concurrent.futures.不会显示进一步的错误.我想找出错误的原因并询问如何做到这一点的想法.

完全问题

我使用concurrent.futures来并行化一些代码.

with ProcessPoolExecutor() as pool:
    mapObj = pool.map(myMethod, args)
Run Code Online (Sandbox Code Playgroud)

我最终得到(并且只有)以下异常:

concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
Run Code Online (Sandbox Code Playgroud)

不幸的是,程序很复杂,只有在程序运行30分钟后才会出现错误.因此,我无法提供一个很好的最小例子.

为了找到问题的原因,我将try-except-block并行运行的方法包装起来:

def myMethod(*args):
    try:
        ...
    except Exception as e:
        print(e)
Run Code Online (Sandbox Code Playgroud)

问题保持不变,并且从未输入过块.我得出结论,异常并非来自我的代码.

我的下一步是编写一个ProcessPoolExecutor原始子类的自定义类,ProcessPoolExecutor并允许我用cusomized替换一些方法.我复制并粘贴了方法的原始代码_process_worker并添加了一些print语句.

def _process_worker(call_queue, result_queue):
    """Evaluates calls from call_queue and places the results in result_queue.
        ...
    """
    while True:
        call_item = call_queue.get(block=True)
        if call_item is None:
            # Wake up queue management …
Run Code Online (Sandbox Code Playgroud)

python debugging concurrent.futures

11
推荐指数
2
解决办法
4423
查看次数

为什么asyncio.Future与concurrent.futures.Future不兼容?

这两个类代表了并发编程的优秀抽象,因此它们不支持相同的API有点令人不安.

具体来说,根据文档:

asyncio.Future几乎兼容concurrent.futures.Future.

区别:

  • result()并且exception()不要采用超时参数并在未来尚未完成时引发异常.
  • 注册的回调add_done_callback()总是通过事件循环调用call_soon_threadsafe().
  • 此类与包中的wait()as_completed()函数不兼容concurrent.futures.

以上列表实际上是不完整的,还有一些差异:

  • running() 方法不存在
  • result()并且exception()可能会提高InvalidStateError,如果叫太早

这些中的任何一个是由于事件循环的固有特性导致这些操作无法实现还是太麻烦而无法实现?

与之相关的差异是什么意思add_done_callback()?无论哪种方式,回调都保证在期货结束后的某个未指定的时间发生,所以这两个类之间是否完全一致?

python concurrency python-3.x python-asyncio concurrent.futures

10
推荐指数
2
解决办法
918
查看次数