标签: python-multiprocessing

Keras + Tensorflow:预测多个gpus

我正在使用带有张量流的Keras作为后端.我有一个编译/训练的模型.

我的预测循环很慢,所以我想找到一种方法来并行化predict_proba调用以加快速度.我想获取批次(数据)列表,然后是每个可用的gpu,运行model.predict_proba()这些批次的子集.
实质上:

data = [ batch_0, batch_1, ... , batch_N ]
on gpu_0 => return predict_proba(batch_0)
on gpu_1 => return predict_proba(batch_1)
...
on gpu_N => return predict_proba(batch_N) 
Run Code Online (Sandbox Code Playgroud)

我知道纯Tensorflow可以将ops分配给给定的gpu(https://www.tensorflow.org/tutorials/using_gpu).但是,我不知道这是如何转化为我的情况,因为我使用Keras的api构建/编译/训练了我的模型.

我曾经想过,也许我只需要使用python的多处理模块并开始运行每个gpu的进程predict_proba(batch_n).我知道这在理论上是可能的,因为我的另一个SO帖子:Keras + Tensorflow和Python中的多处理.然而,这仍然让我不知道如何实际"选择"一个gpu来操作这个过程.

我的问题归结为:当使用Tensorflow作为Keras的后端时,如何将Keras中的一个模型的预测与多个gpus并行化?

另外,我很好奇是否只有一个gpu可以进行类似的预测并行化.

高级描述或代码示例将不胜感激!

谢谢!

python python-multiprocessing keras tensorflow

13
推荐指数
1
解决办法
7267
查看次数

Python多处理写时复制在OSX和Ubuntu之间表现不同

我正在尝试在Python中的父进程和子进程之间共享对象.为了解决这个想法,我创建了一个简单的Python脚本:

from multiprocessing import Process
from os import getpid

import psutil

shared = list(range(20000000))

def shared_printer():
    mem = psutil.Process(getpid()).memory_info().rss / (1024 ** 2)
    print(getpid(), len(shared), '{}MB'.format(mem))

if __name__ == '__main__':
    p = Process(target=shared_printer)
    p.start()
    shared_printer()
    p.join()
Run Code Online (Sandbox Code Playgroud)

代码片段使用优秀的psutil库来打印RSS(驻留集大小).当我在OSX上用Python 2.7.15运行它时,我得到以下输出:

(33101, 20000000, '1MB')
(33100, 20000000, '626MB')
Run Code Online (Sandbox Code Playgroud)

当我在Ubuntu上运行完全相同的代码片段(Linux 4.15.0-1029-aws#30-Ubuntu SMP x86_64 GNU/Linux)时,我得到以下输出:

(4077, 20000000, '632MB')
(4078, 20000000, '629MB')
Run Code Online (Sandbox Code Playgroud)

请注意,子进程'RSS在OSX上是基本的0MB,与Linux中的父进程'RSS大小相同.我曾假设写入时复制行为在Linux中的工作方式相同,并允许子进程为大多数页面引用父进程的内存(可能除了存储对象头部的内存之外).

所以我猜测2系统中的写时复制行为存在一些差异.我的问题是:在Linux中我能做些什么来获得类似OSX的写时复制行为?

python linux copy-on-write shared-memory python-multiprocessing

13
推荐指数
1
解决办法
372
查看次数

如何在multiprocessing.pool中命名进程?

如果我创建一个包含4个工作组的池并将它们设置为执行某些任务(使用pool.apply_async(..)),我可以从使用中访问每个进程的名称multiprocessing.current_process().name,但是如何从父进程设置名称(这主要用于日志记录)?

python python-multiprocessing

12
推荐指数
1
解决办法
7136
查看次数

产生python多处理池时出现意外的内存占用差异

试图为pystruct模块中的并行化做出贡献,并在讨论中试图解释我为什么要尽可能早地在执行中实例化池并尽可能长时间地保留它们,重用它们,我意识到我知道它最适合这样做,但我不完全知道为什么.

我知道*nix系统上的声明是池工作器子进程在写入时从父进程中的所有全局变量复制.总的来说情况确实如此,但我认为应该补充说,当其中一个全局变量是一个特别密集的数据结构,如numpy或scipy矩阵时,似乎任何引用都被复制到工作者中实际上是漂亮的即使整个对象没有被复制也是相当大的,因此在执行后期产生新池会导致内存问题.我发现最好的做法是尽早产生一个池,这样任何数据结构都很小.

我已经知道了一段时间并在工作中的应用程序中围绕它进行了设计,但我得到的最好的解释是我在帖子中发布的内容:

https://github.com/pystruct/pystruct/pull/129#issuecomment-68898032

从下面的python脚本看,基本上,你会期望第一次运行中创建的池中的空闲内存和第二次运行中创建的矩阵步骤基本相同,就像在最终池终止的调用中一样.但是它们从来都不是,当你首先创建池时,总会有(除非机器上还有其它东西)更多的空闲内存.在创建池时,这种影响会随着全局命名空间中数据结构的复杂性(和大小)而增加(我认为).有没有人对此有一个很好的解释?

我用bash循环和下面的R脚本来制作这个小图片来说明,在创建池和矩阵之后显示整体的可用内存,具体取决于顺序:

自由记忆趋势情节,两种方式

pool_memory_test.py:

import numpy as np
import multiprocessing as mp
import logging

def memory():
    """
    Get node total memory and memory usage
    """
    with open('/proc/meminfo', 'r') as mem:
        ret = {}
        tmp = 0
        for i in mem:
            sline = i.split()
            if str(sline[0]) == 'MemTotal:':
                ret['total'] = int(sline[1])
            elif str(sline[0]) in ('MemFree:', 'Buffers:', 'Cached:'):
                tmp += int(sline[1])
        ret['free'] = tmp
        ret['used'] = int(ret['total']) - int(ret['free'])
    return ret

if __name__ == '__main__':
    import argparse
    parser = …
Run Code Online (Sandbox Code Playgroud)

python numpy python-multiprocessing

12
推荐指数
1
解决办法
706
查看次数

Python:TypeError:出于安全原因,不允许选择AuthenticationString对象

我正在创建一个类的对象(with multiprocessing)并将其添加到一个,Manager.dict()以便我可以在其工作完成时从对象内的字典中删除该项(该项指向).

我尝试了以下代码:

from multiprocessing import Manager, Process

class My_class(Process):
    def __init__(self):
        super(My_class, self).__init__()
        print "Object", self, "created."

    def run(self):
        print "Object", self, "process started."


manager=Manager()
object_dict=manager.dict()

for x in range(2):
    object_dict[x]=My_class()
    object_dict[x].start()
Run Code Online (Sandbox Code Playgroud)

但是我收到了一个错误:

TypeError: Pickling an AuthenticationString object is disallowed
for security reasons
Run Code Online (Sandbox Code Playgroud)

为了好奇,我删除了多处理部分,并试着像:

from multiprocessing import Manager
class My_class():
    def __init__(self):
        print "Object", self, "created."

manager=Manager()
object_dict=manager.dict()

for x in range(2):
    object_dict[x]=My_class()
Run Code Online (Sandbox Code Playgroud)

它没有给我任何错误并显示两个对象的地址.

这个错误是什么以及如何让它消失?

python class multiprocessing python-multiprocessing

12
推荐指数
1
解决办法
3445
查看次数

在Python中的进程之间共享许多队列

我知道multiprocessing.Manager()它是如何用于创建共享对象的,特别是可以在worker之间共享的队列.有这个问题,这个问题,这个问题甚至是我自己的一个问题.

但是,我需要定义很多队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量标识key.

当我需要放置和获取数据时,我想使用字典来访问我的队列.我无法做到这一点.我尝试过很多东西.随着multiprocessing进口为mp:

for key in all_keys: DICT[key] = mp.Queue在多处理模块导入的配置文件中定义一个dict (调用它multi.py)不会返回错误,但是队列DICT[key]之间没有共享队列,每个队列似乎都有自己的队列副本,因此没有通信发生.

如果我尝试在定义DICT进程并启动它们的主多处理函数的开头定义,比如

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()
Run Code Online (Sandbox Code Playgroud)

我收到了错误

RuntimeError: Queue objects should only be shared between processes through
 inheritance
Run Code Online (Sandbox Code Playgroud)

改为

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()
Run Code Online (Sandbox Code Playgroud)

只会让事情变得更糟.multi.py在主函数的头部而不是在main函数内部尝试类似的定义会返回类似的错误.

必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列.有任何想法吗?

编辑

这是该程序的基本架构:

1-加载第一个模块,它定义一些变量,导入multi,启动multi.main()和加载另一个模块,该模块启动一系列模块加载和代码执行.与此同时...

2- …

python queue multiprocessing python-multiprocessing

12
推荐指数
1
解决办法
1万
查看次数

psycopg2错误:DatabaseError:错误,没有来自libpq的消息

我有一个应用程序,它将csv文件中的数据解析并加载到Postgres 9.3数据库中.在串行执行中,插入语句/游标执行没有问题.

我在混合中添加了celery以添加并行解析和插入数据文件.解析工作正常.但是,我去运行插入语句,我得到:

[2015-05-13 11:30:16,464:  ERROR/Worker-1] ingest_task.work_it: Exception
    Traceback (most recent call last):
    File "ingest_tasks.py", line 86, in work_it
        rowcount = ingest_data.load_data(con=con, statements=statements)
    File "ingest_data.py", line 134, in load_data
        ingest_curs.execute(statement)
    DatabaseError: error with no message from the libpq
Run Code Online (Sandbox Code Playgroud)

python postgresql psycopg celery python-multiprocessing

12
推荐指数
1
解决办法
3244
查看次数

芹菜:守护进程不允许有孩子

在Python(2.7)中,我尝试在芹菜任务(celery 3.1.17)中创建进程(使用多处理),但它给出了错误:

daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)

谷歌搜索它,我发现最新版本的台球修复了"错误",但我有最新版本(3.3.0.20),错误仍在发生.我也试图在我的芹菜任务中实现这个解决方法,但它给出了同样的错误.

有谁知道怎么做?任何帮助表示赞赏,帕特里克

编辑:代码片段

任务:

from __future__ import absolute_import
from celery import shared_task
from embedder.models import Embedder

@shared_task
def embedder_update_task(embedder_id):
    embedder = Embedder.objects.get(pk=embedder_id)
    embedder.test()
Run Code Online (Sandbox Code Playgroud)

人工测试功能(从这里):

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t    

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = mp.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, …
Run Code Online (Sandbox Code Playgroud)

python daemon celery python-2.7 python-multiprocessing

12
推荐指数
3
解决办法
6929
查看次数

非阻塞multiprocessing.connection.Listener?

我使用multiprocessing.connection.Listener进行进程之间的通信,它对我来说是一个魅力.现在我真的很喜欢我的mainloop在客户端的命令之间做一些其他事情.不幸的是,listener.accept()阻止执行,直到建立客户端进程的连接.

是否有一种简单的方法来管理multiprocessing.connection的非阻塞检查?超时?或者我应该使用专用线程?

    # Simplified code:

    from multiprocessing.connection import Listener

    def mainloop():
        listener = Listener(address=(localhost, 6000), authkey=b'secret')

        while True:
            conn = listener.accept() # <---  This blocks!
            msg = conn.recv() 
            print ('got message: %r' % msg)
            conn.close()
Run Code Online (Sandbox Code Playgroud)

python sockets asynchronous python-3.3 python-multiprocessing

12
推荐指数
1
解决办法
1817
查看次数

在多处理池中运行scipy.integrate.ode会导致巨大的性能损失

我正在使用python scipy.integrate来模拟29维线性微分方程组.由于我需要解决几个问题实例,我想我可以通过并行计算来加速它multiprocessing.Pool.由于线程之间没有共享数据或同步(问题是令人尴尬的并行),我认为这显然应该有效.然而,在我编写完代码之后,我得到了非常奇怪的性能测量:

  • 单线程,无jacobian:每次通话20-30 ms
  • 单线程,带雅可比:每次通话10-20毫秒
  • 多线程,没有雅可比:每次通话20-30毫秒
  • 多线程,雅可比:每次通话10-5000毫秒

什么是令人震惊的是,我认为应该是最快的设置,实际上是最慢的,而变异是两个数量级.这是一个确定性的计算; 计算机不应该以这种方式工作.什么可能导致这个?

效果似乎与系统有关

我在另一台计算机上尝试了相同的代码,但我没有看到这种效果.

两台机器都使用Ubuntu 64位,Python 2.7.6,scipy版本0.18.0和numpy版本1.8.2.我没有看到Intel(R)Core(TM)i5-5300U CPU @ 2.30GHz处理器的可变性.我确实看到了英特尔(R)Core(TM)i7-2670QM CPU @ 2.20GHz的问题.

理论

一个想法是处理器之间可能存在共享缓存,并且通过并行运行它不能适应缓存中的jacobian矩阵的两个实例,因此它们不断地相互竞争以使缓存相互减慢它们是连续运行或没有jacobian.但它不是一个百万变量系统.jacobian是一个29x29矩阵,占用6728个字节.处理器上的1级缓存为4 x 32 KB,大得多.处理器之间是否还有其他共享资源可能是罪魁祸首?我们怎么测试呢?

我注意到的另一件事是每个python进程在运行时似乎占用了几百%的CPU.这似乎意味着代码已经在某些时候并行化了(可能在低级库中).这可能意味着进一步的并行化无济于事,但我不希望这种急剧放缓.

尝试在更多机器上查看(1)其他人是否可以体验到减速并且(2)发生减速的系统的共同特征是什么是好的.该代码使用大小为2的多处理池进行两次并行计算的10次试验,为10次试验中的每次试验打印出每次scipy.ode.integrate调用的时间.

'odeint with multiprocessing variable execution time demonsrtation'

from numpy import dot as npdot
from numpy import add as npadd
from numpy import matrix as npmatrix
from scipy.integrate import ode
from multiprocessing import Pool
import time

def main():
    "main function"

    pool = Pool(2) # …
Run Code Online (Sandbox Code Playgroud)

python numpy scipy python-multiprocessing

12
推荐指数
1
解决办法
945
查看次数