我知道multiprocessing.Manager()它是如何用于创建共享对象的,特别是可以在worker之间共享的队列.有这个问题,这个问题,这个问题甚至是我自己的一个问题.
但是,我需要定义很多队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量标识key.
当我需要放置和获取数据时,我想使用字典来访问我的队列.我无法做到这一点.我尝试过很多东西.随着multiprocessing进口为mp:
for key in all_keys: DICT[key] = mp.Queue在多处理模块导入的配置文件中定义一个dict (调用它multi.py)不会返回错误,但是队列DICT[key]之间没有共享队列,每个队列似乎都有自己的队列副本,因此没有通信发生.
如果我尝试在定义DICT进程并启动它们的主多处理函数的开头定义,比如
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
Run Code Online (Sandbox Code Playgroud)
我收到了错误
RuntimeError: Queue objects should only be shared between processes through
inheritance
Run Code Online (Sandbox Code Playgroud)
改为
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
Run Code Online (Sandbox Code Playgroud)
只会让事情变得更糟.multi.py在主函数的头部而不是在main函数内部尝试类似的定义会返回类似的错误.
必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列.有任何想法吗?
编辑
这是该程序的基本架构:
1-加载第一个模块,它定义一些变量,导入multi,启动multi.main()和加载另一个模块,该模块启动一系列模块加载和代码执行.与此同时...
2- …
我有一个应用程序,它将csv文件中的数据解析并加载到Postgres 9.3数据库中.在串行执行中,插入语句/游标执行没有问题.
我在混合中添加了celery以添加并行解析和插入数据文件.解析工作正常.但是,我去运行插入语句,我得到:
[2015-05-13 11:30:16,464: ERROR/Worker-1] ingest_task.work_it: Exception
Traceback (most recent call last):
File "ingest_tasks.py", line 86, in work_it
rowcount = ingest_data.load_data(con=con, statements=statements)
File "ingest_data.py", line 134, in load_data
ingest_curs.execute(statement)
DatabaseError: error with no message from the libpq
Run Code Online (Sandbox Code Playgroud) 我正在进行复杂模拟参数的优化.我使用多处理模块来增强优化算法的性能.我在http://pymotw.com/2/multiprocessing/basics.html上学到了多处理的基础知识.根据优化算法中给定的参数,复杂模拟持续不同的时间,大约1到5分钟.如果选择的参数非常糟糕,则模拟可持续30分钟或更长时间,结果无效.所以我在考虑在多处理的超时中构建,这会终止所有持续超过定义时间的模拟.这是问题的抽象版本:
import numpy as np
import time
import multiprocessing
def worker(num):
time.sleep(np.random.random()*20)
def main():
pnum = 10
procs = []
for i in range(pnum):
p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
procs.append(p)
p.start()
print 'starting', p.name
for p in procs:
p.join(5)
print 'stopping', p.name
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
该行p.join(5)定义了5秒的超时.由于for循环for p in procs:,程序等待5秒直到第一个进程完成,然后再等待5秒直到第二个进程完成,依此类推,但我希望程序终止所有持续超过5秒的进程.此外,如果所有进程的持续时间都不超过5秒,则程序不得等待5秒钟.
在使用Python multiprocessing模块时,我几乎没有基本问题:
class Someparallelworkerclass(object) :
def __init__(self):
self.num_workers = 4
self.work_queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.JoinableQueue()
def someparallellazymethod(self):
p = multiprocessing.Process(target=self.worktobedone).start()
def worktobedone(self):
# get data from work_queue
# put back result in result queue
Run Code Online (Sandbox Code Playgroud)
是否有必要通过work_queue,并result_queue作为args对Process?答案取决于操作系统吗?更基本的问题是:子进程是否从父进程获得复制(COW)地址空间,因此知道类/类方法的定义?如果是的话,它是如何知道的队列是IPC要共享,而且它不应该做的副本work_queue,并result_queue在子进程?我尝试在线搜索,但我发现的大部分文档都很模糊,并没有详细说明底层究竟发生了什么.
我希望拥有全局对象,所有进程都以最小的锁定进行共享和更新.
import multiprocessing
class Counter(object):
def __init__(self):
self.value = 0
def update(self, value):
self.value += value
def update(counter_proxy, thread_id):
counter_proxy.value.update(1)
print counter_proxy.value.value, 't%s' % thread_id, \
multiprocessing.current_process().name
return counter_proxy.value.value
def main():
manager = multiprocessing.Manager()
counter = manager.Value(Counter, Counter())
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for i in range(10):
pool.apply(func = update, args = (counter, i))
pool.close()
pool.join()
print 'Should be 10 but is %s.' % counter.value.value
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
结果是 - 不是10而是零.看起来该对象的共享值未更新.如何锁定和更新此值?
0 t0 PoolWorker-2
0 t1 PoolWorker-3
0 t2 PoolWorker-5 …Run Code Online (Sandbox Code Playgroud) 我正在使用以下代码将CSV文件拆分为多个块(源自此处)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
但是,无论我选择使用多少个块,似乎块的数量始终保持不变.例如,无论我选择有1个还是10个块,我总是在处理样本文件时获得此输出.理想情况下,我想将文件分块以便公平分配.
请注意,我正在分块的真实文件超过1300万行,这就是为什么我要一块一块地处理它.这是必须的!
6
7
1
...
1
1
94
--- 0.101687192917 …Run Code Online (Sandbox Code Playgroud) 任何人都可以告诉我为什么Python的multiprocessing.cpu_count()功能会1在调用带有四个ARMv7处理器的Jetson TK1时返回?
>>> import multiprocessing
>>> multiprocessing.cpu_count()
1
Run Code Online (Sandbox Code Playgroud)
Jetson TK1开发板或多或少都是开箱即用的,没有人与cpusets混淆.在同一个Python shell中,我可以打印内容,/proc/self/status它告诉我该进程应该可以访问所有四个核心:
>>> print open('/proc/self/status').read()
----- (snip) -----
Cpus_allowed: f
Cpus_allowed_list: 0-3
----- (snip) -----
Run Code Online (Sandbox Code Playgroud)
还有什么可能导致这种行为cpu_count()?
为了测试Klaus的假设,我使用以下代码来运行一个非常简单的实验:
import multiprocessing
def f(x):
n = 0
for i in xrange(10000):
n = max(n, multiprocessing.cpu_count())
return n
p = multiprocessing.Pool(5)
for i in range(10):
print p.map(f, [1,2,3,4,5])
Run Code Online (Sandbox Code Playgroud)
其中产生了以下输出:
[3, 3, 3, 3, 1]
[4, 3, 3, 3, 3]
[4, 3, 3, 3, 3]
[3, 3, …Run Code Online (Sandbox Code Playgroud) 考虑以下功能:
def f(x, dummy=list(range(10000000))):
return x
Run Code Online (Sandbox Code Playgroud)
如果我使用multiprocessing.Pool.imap,我会得到以下时间:
import time
import os
from multiprocessing import Pool
def f(x, dummy=list(range(10000000))):
return x
start = time.time()
pool = Pool(2)
for x in pool.imap(f, range(10)):
print("parent process, x=%s, elapsed=%s" % (x, int(time.time() - start)))
parent process, x=0, elapsed=0
parent process, x=1, elapsed=0
parent process, x=2, elapsed=0
parent process, x=3, elapsed=0
parent process, x=4, elapsed=0
parent process, x=5, elapsed=0
parent process, x=6, elapsed=0
parent process, x=7, elapsed=0
parent process, x=8, elapsed=0
parent process, …Run Code Online (Sandbox Code Playgroud) 我有multiprocessing一些看起来有点像这样的Python代码:
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = np.zeros(100000000) # basically a big memory struct
def my_multithreaded_analysis(self):
arg_lists = [(self, i) for i in range(10)]
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
if __name__ == '__main__':
my_instance = MyClass()
my_instance.my_multithreaded_analysis()
Run Code Online (Sandbox Code Playgroud)
在阅读了有关内存如何在其他StackOverflow答案(例如Python多处理内存使用情况)中工作的答案后,我认为这不会使用内存与我用于多处理的进程数量成比例,因为它是写时复制和我没有修改任何属性my_instance.但是,当我运行顶部时,我确实看到了所有进程的高内存,它说我的大多数进程都使用了大量内存(这是OSX的最高输出,但我可以在Linux上复制).
我的问题基本上是,我是否正确地解释了这一点,因为我的实例MyClass实际上是在池中重复的?如果是这样,我该如何防止这种情况; 我应该不使用这样的结构吗?我的目标是减少计算分析的内存使用量.
PID COMMAND …Run Code Online (Sandbox Code Playgroud) python memory-management multiprocessing python-multiprocessing
有没有办法在使用Python的多处理模块创建的进程中加载模块的每个进程副本?我试过这个:
def my_fn(process_args):
import my_module
my_func()
Run Code Online (Sandbox Code Playgroud)
...但my_module中的子导入会一劳永逸地加载和缓存.特别是,其中一个子导入读取一个配置文件,其值根据第一个进程的环境设置.如果我试试这个:
def my_fn(process_args):
try:
my_module = reload(my_module)
except NameError:
import my_module
Run Code Online (Sandbox Code Playgroud)
... my_module的子导入不会重新加载.