相关疑难解决方法(0)

线程池类似于多处理池?

工作线程是否有Pool类,类似于多处理模块的Pool类

我喜欢例如并行化地图功能的简单方法

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
Run Code Online (Sandbox Code Playgroud)

但是我想在没有创建新流程的开销的情况下这样做.

我知道GIL.但是,在我的用例中,该函数将是一个IO绑定的C函数,python包装器将在实际函数调用之前释放GIL.

我是否必须编写自己的线程池?

python multithreading missing-features

326
推荐指数
8
解决办法
27万
查看次数

如何从python中的线程获取返回值?

如何获取foo从线程目标返回的值?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()
Run Code Online (Sandbox Code Playgroud)

如上所示,"一种显而易见的方法"不起作用:'foo'返回'foo'.

python multithreading

289
推荐指数
15
解决办法
28万
查看次数

Python多处理:是否可以在池中设置池?

我有一个模块A,它通过获取数据并将其发送到模块B,C,D等进行分析然后将它们的结果连接在一起来执行基本映射/缩减.

但似乎模块B,C,D等本身不能创建多处理池,否则我得到

AssertionError: daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)

是否有可能以其他方式并行化这些工作?

为清楚起见,这里是一个(通常是坏的)婴儿的例子.(我通常会尝试/捕捉,但你得到了要点.)

A.py:

  import B
  from multiprocessing import Pool

  def main():
    p = Pool()
    results = p.map(B.foo,range(10))
    p.close()
    p.join()
    return results


B.py:

  from multiprocessing import Pool

  def foo(x):
    p = Pool()
    results = p.map(str,x)
    p.close()
    p.join()
    return results
Run Code Online (Sandbox Code Playgroud)

python multiprocessing

16
推荐指数
1
解决办法
9243
查看次数

芹菜:守护进程不允许有孩子

在Python(2.7)中,我尝试在芹菜任务(celery 3.1.17)中创建进程(使用多处理),但它给出了错误:

daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)

谷歌搜索它,我发现最新版本的台球修复了"错误",但我有最新版本(3.3.0.20),错误仍在发生.我也试图在我的芹菜任务中实现这个解决方法,但它给出了同样的错误.

有谁知道怎么做?任何帮助表示赞赏,帕特里克

编辑:代码片段

任务:

from __future__ import absolute_import
from celery import shared_task
from embedder.models import Embedder

@shared_task
def embedder_update_task(embedder_id):
    embedder = Embedder.objects.get(pk=embedder_id)
    embedder.test()
Run Code Online (Sandbox Code Playgroud)

人工测试功能(从这里):

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t    

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = mp.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, …
Run Code Online (Sandbox Code Playgroud)

python daemon celery python-2.7 python-multiprocessing

12
推荐指数
3
解决办法
6929
查看次数

我应该并行运行多少个进程?

我有一个并行化的任务,从多个文件中读取内容,并将信息写入多个文件.

我目前用来并行化东西的成语:

listOfProcesses = []
for fileToBeRead in listOfFilesToBeRead:
    process = multiprocessing.Process(target = somethingThatReadsFromAFileAndWritesSomeStuffOut, args = (fileToBeRead))
    process.start()
    listOfProcesses.append(process)

for process in listOfProcesses:
    process.join()
Run Code Online (Sandbox Code Playgroud)

值得注意的是,somethingThatReadsFromAFileAndWritesSomeStuffOut它本身可以并行化任务(它可能必须从其他文件中读取等等).

现在,正如您所看到的,正在创建的进程数量不依赖于我在计算机上拥有的核心数量或其他任何内容,除了需要完成的任务数量.如果需要运行十个任务,请创建十个进程,依此类推.

这是创建任务的最佳方式吗?我应该考虑一下我的处理器有多少核心,等等?

multiprocessing python-2.7

9
推荐指数
1
解决办法
1万
查看次数

python ThreadPool中每个线程的超时

我正在使用Python 2.7。

我目前正在使用ThreadPoolExecuter这样的:

params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
    result = list(executor.map(f, params))
Run Code Online (Sandbox Code Playgroud)

问题是f有时运行时间过长。每当我运行时f,我都希望将其运行时间限制为100秒,然后终止运行。

最终,每个元素xparam,我想有与否的指示f必须被杀死,并且如果它不是-是什么返回值。即使f对于一个参数超时,我仍然希望使用下一个参数运行它。

executer.map方法确实有一个timeout参数,但是它为整个运行(从调用时间到)设置了超时executer.map,而不是分别为每个线程设置超时。

什么是获得我想要的行为的最简单方法?

python multithreading future python-2.7 concurrent.futures

6
推荐指数
1
解决办法
4865
查看次数

multiprocessing.pool 中的错误组参数目前必须为 None

下面是我的 python 脚本。

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." …
Run Code Online (Sandbox Code Playgroud)

python python-multiprocessing python-3.6 ubuntu-16.04

6
推荐指数
1
解决办法
5279
查看次数

多处理给出断言错误:不允许守护进程有孩子

我第一次尝试使用多处理。所以我想我会做一个非常简单的测试示例,它可以分解 100 个不同的数字。

from multiprocessing import Pool
from primefac import factorint
N = 10**30
L = range(N,N + 100)
pool = Pool()
pool.map(factorint, L)
Run Code Online (Sandbox Code Playgroud)

这给了我错误:

Traceback (most recent call last):
  File "test.py", line 8, in <module>
    pool.map(factorint, L)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
AssertionError: daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)

我看到Python 进程池非守护进程?讨论了这个问题,但我不明白为什么它与我的简单玩具示例相关。我究竟做错了什么?

python python-multiprocessing

5
推荐指数
1
解决办法
7245
查看次数