标签: python-multiprocessing

在Python中的进程之间共享许多队列

我知道multiprocessing.Manager()它是如何用于创建共享对象的,特别是可以在worker之间共享的队列.有这个问题,这个问题,这个问题甚至是我自己的一个问题.

但是,我需要定义很多队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量标识key.

当我需要放置和获取数据时,我想使用字典来访问我的队列.我无法做到这一点.我尝试过很多东西.随着multiprocessing进口为mp:

for key in all_keys: DICT[key] = mp.Queue在多处理模块导入的配置文件中定义一个dict (调用它multi.py)不会返回错误,但是队列DICT[key]之间没有共享队列,每个队列似乎都有自己的队列副本,因此没有通信发生.

如果我尝试在定义DICT进程并启动它们的主多处理函数的开头定义,比如

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()
Run Code Online (Sandbox Code Playgroud)

我收到了错误

RuntimeError: Queue objects should only be shared between processes through
 inheritance
Run Code Online (Sandbox Code Playgroud)

改为

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()
Run Code Online (Sandbox Code Playgroud)

只会让事情变得更糟.multi.py在主函数的头部而不是在main函数内部尝试类似的定义会返回类似的错误.

必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列.有任何想法吗?

编辑

这是该程序的基本架构:

1-加载第一个模块,它定义一些变量,导入multi,启动multi.main()和加载另一个模块,该模块启动一系列模块加载和代码执行.与此同时...

2- …

python queue multiprocessing python-multiprocessing

12
推荐指数
1
解决办法
1万
查看次数

psycopg2错误:DatabaseError:错误,没有来自libpq的消息

我有一个应用程序,它将csv文件中的数据解析并加载到Postgres 9.3数据库中.在串行执行中,插入语句/游标执行没有问题.

我在混合中添加了celery以添加并行解析和插入数据文件.解析工作正常.但是,我去运行插入语句,我得到:

[2015-05-13 11:30:16,464:  ERROR/Worker-1] ingest_task.work_it: Exception
    Traceback (most recent call last):
    File "ingest_tasks.py", line 86, in work_it
        rowcount = ingest_data.load_data(con=con, statements=statements)
    File "ingest_data.py", line 134, in load_data
        ingest_curs.execute(statement)
    DatabaseError: error with no message from the libpq
Run Code Online (Sandbox Code Playgroud)

python postgresql psycopg celery python-multiprocessing

12
推荐指数
1
解决办法
3244
查看次数

Python多处理模块:使用超时连接进程

我正在进行复杂模拟参数的优化.我使用多处理模块来增强优化算法的性能.我在http://pymotw.com/2/multiprocessing/basics.html上学到了多处理的基础知识.根据优化算法中给定的参数,复杂模拟持续不同的时间,大约1到5分钟.如果选择的参数非常糟糕,则模拟可持续30分钟或更长时间,结果无效.所以我在考虑在多处理的超时中构建,这会终止所有持续超过定义时间的模拟.这是问题的抽象版本:

import numpy as np
import time
import multiprocessing

def worker(num):

    time.sleep(np.random.random()*20)

def main():

    pnum = 10    

    procs = []
    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print 'starting', p.name

    for p in procs:
        p.join(5)
        print 'stopping', p.name

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

该行p.join(5)定义了5秒的超时.由于for循环for p in procs:,程序等待5秒直到第一个进程完成,然后再等待5秒直到第二个进程完成,依此类推,但我希望程序终止所有持续超过5秒的进程.此外,如果所有进程的持续时间都不超过5秒,则程序不得等待5秒钟.

python timeout python-multiprocessing

11
推荐指数
2
解决办法
2万
查看次数

我是否需要将multiprocessing.Queue实例变量显式传递给子进程在实例方法上执行?

在使用Python multiprocessing模块时,我几乎没有基本问题:

class Someparallelworkerclass(object) :

    def __init__(self):
       self.num_workers = 4
       self.work_queue = multiprocessing.JoinableQueue()
       self.result_queue = multiprocessing.JoinableQueue()

    def someparallellazymethod(self):
       p = multiprocessing.Process(target=self.worktobedone).start()

    def worktobedone(self):
      # get data from work_queue
      # put back result in result queue
Run Code Online (Sandbox Code Playgroud)

是否有必要通过work_queue,并result_queue作为argsProcess?答案取决于操作系统吗?更基本的问题是:子进程是否从父进程获得复制(COW)地址空间,因此知道类/类方法的定义?如果是的话,它是如何知道的队列是IPC要共享,而且它不应该做的副本work_queue,并result_queue在子进程?我尝试在线搜索,但我发现的大部分文档都很模糊,并没有详细说明底层究竟发生了什么.

python multiprocessing python-multiprocessing

11
推荐指数
2
解决办法
1542
查看次数

如何在进程之间共享一个类?

我希望拥有全局对象,所有进程都以最小的锁定进行共享和更新.

import multiprocessing

class Counter(object):
  def __init__(self):
    self.value = 0

  def update(self, value):
    self.value += value


def update(counter_proxy, thread_id):
  counter_proxy.value.update(1)
  print counter_proxy.value.value, 't%s' % thread_id, \
    multiprocessing.current_process().name
  return counter_proxy.value.value

def main():
  manager = multiprocessing.Manager()
  counter = manager.Value(Counter, Counter())
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func = update, args = (counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.value.value

if __name__ == '__main__':
  main()
Run Code Online (Sandbox Code Playgroud)

结果是 - 不是10而是零.看起来该对象的共享值未更新.如何锁定和更新此值?

0 t0 PoolWorker-2
0 t1 PoolWorker-3
0 t2 PoolWorker-5 …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing python-multiprocessing

11
推荐指数
1
解决办法
1万
查看次数

Python Chunking CSV文件多处理

我正在使用以下代码将CSV文件拆分为多个块(源自此处)

def worker(chunk):
    print len(chunk)

def keyfunc(row):
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'Counseling.csv'
    num_chunks = 10
    start_time = time.time()
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        reader.next()
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

但是,无论我选择使用多少个块,似乎块的数量始终保持不变.例如,无论我选择有1个还是10个块,我总是在处理样本文件时获得此输出.理想情况下,我想将文件分块以便公平分配.

请注意,我正在分块的真实文件超过1300万行,这就是为什么我要一块一块地处理它.这是必须的!

6
7
1
...
1
1
94
--- 0.101687192917 …
Run Code Online (Sandbox Code Playgroud)

python csv numpy multiprocessing python-multiprocessing

11
推荐指数
2
解决办法
1934
查看次数

Python multiprocessing.cpu_count()在4核Nvidia Jetson TK1上返回'1'

任何人都可以告诉我为什么Python的multiprocessing.cpu_count()功能会1在调用带有四个ARMv7处理器的Jetson TK1时返回?

>>> import multiprocessing
>>> multiprocessing.cpu_count()
1
Run Code Online (Sandbox Code Playgroud)

Jetson TK1开发板或多或少都是开箱即用的,没有人与cpusets混淆.在同一个Python shell中,我可以打印内容,/proc/self/status它告诉我该进程应该可以访问所有四个核心:

>>> print open('/proc/self/status').read()
----- (snip) -----
Cpus_allowed:   f
Cpus_allowed_list:      0-3
----- (snip) -----
Run Code Online (Sandbox Code Playgroud)

还有什么可能导致这种行为cpu_count()

编辑:

为了测试Klaus的假设,我使用以下代码来运行一个非常简单的实验:

import multiprocessing

def f(x):
    n = 0
    for i in xrange(10000):
        n = max(n, multiprocessing.cpu_count())
    return n

p = multiprocessing.Pool(5)
for i in range(10):
    print p.map(f, [1,2,3,4,5])
Run Code Online (Sandbox Code Playgroud)

其中产生了以下输出:

[3, 3, 3, 3, 1]
[4, 3, 3, 3, 3]
[4, 3, 3, 3, 3]
[3, 3, …
Run Code Online (Sandbox Code Playgroud)

python cpu python-multiprocessing

11
推荐指数
2
解决办法
2万
查看次数

Python多处理 - 为什么使用functools.partial比默认参数慢?

考虑以下功能:

def f(x, dummy=list(range(10000000))):
    return x
Run Code Online (Sandbox Code Playgroud)

如果我使用multiprocessing.Pool.imap,我会得到以下时间:

import time
import os
from multiprocessing import Pool

def f(x, dummy=list(range(10000000))):
    return x

start = time.time()
pool = Pool(2)
for x in pool.imap(f, range(10)):
    print("parent process, x=%s, elapsed=%s" % (x, int(time.time() - start)))

parent process, x=0, elapsed=0
parent process, x=1, elapsed=0
parent process, x=2, elapsed=0
parent process, x=3, elapsed=0
parent process, x=4, elapsed=0
parent process, x=5, elapsed=0
parent process, x=6, elapsed=0
parent process, x=7, elapsed=0
parent process, x=8, elapsed=0
parent process, …
Run Code Online (Sandbox Code Playgroud)

python python-3.x functools python-multiprocessing

11
推荐指数
1
解决办法
2796
查看次数

利用"Copy-on-Write"将数据复制到Multiprocessing.Pool()工作进程

我有multiprocessing一些看起来有点像这样的Python代码:

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = np.zeros(100000000) # basically a big memory struct

    def my_multithreaded_analysis(self):
        arg_lists = [(self, i) for i in range(10)]
        pool = Pool(processes=10)
        result = pool.map(call_method, arg_lists)
        print result

    def analyze(self, i):
        time.sleep(10)
        return i ** 2

def call_method(args):
    my_instance, i = args
    return my_instance.analyze(i)


if __name__ == '__main__':
    my_instance = MyClass()
    my_instance.my_multithreaded_analysis()
Run Code Online (Sandbox Code Playgroud)

在阅读了有关内存如何在其他StackOverflow答案(例如Python多处理内存使用情况)中工作的答案后,我认为这不会使用内存与我用于多处理的进程数量成比例,因为它是写时复制和我没有修改任何属性my_instance.但是,当我运行顶部时,我确实看到了所有进程的高内存,它说我的大多数进程都使用了大量内存(这是OSX的最高输出,但我可以在Linux上复制).

我的问题基本上是,我是否正确地解释了这一点,因为我的实例MyClass实际上是在池中重复的?如果是这样,我该如何防止这种情况; 我应该不使用这样的结构吗?我的目标是减少计算分析的内存使用量.

PID   COMMAND …
Run Code Online (Sandbox Code Playgroud)

python memory-management multiprocessing python-multiprocessing

11
推荐指数
2
解决办法
2722
查看次数

在多处理模块中为每个进程重新加载Python模块

有没有办法在使用Python的多处理模块创建的进程中加载​​模块的每个进程副本?我试过这个:

def my_fn(process_args):
    import my_module
    my_func()
Run Code Online (Sandbox Code Playgroud)

...但my_module中的子导入会一劳永逸地加载和缓存.特别是,其中一个子导入读取一个配置文件,其值根据第一个进程的环境设置.如果我试试这个:

def my_fn(process_args):
    try:
        my_module = reload(my_module)
    except NameError:
        import my_module
Run Code Online (Sandbox Code Playgroud)

... my_module的子导入不会重新加载.

python python-module python-multiprocessing

11
推荐指数
1
解决办法
971
查看次数