我正在进行复杂模拟参数的优化.我使用多处理模块来增强优化算法的性能.我在http://pymotw.com/2/multiprocessing/basics.html上学到了多处理的基础知识.根据优化算法中给定的参数,复杂模拟持续不同的时间,大约1到5分钟.如果选择的参数非常糟糕,则模拟可持续30分钟或更长时间,结果无效.所以我在考虑在多处理的超时中构建,这会终止所有持续超过定义时间的模拟.这是问题的抽象版本:
import numpy as np
import time
import multiprocessing
def worker(num):
time.sleep(np.random.random()*20)
def main():
pnum = 10
procs = []
for i in range(pnum):
p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
procs.append(p)
p.start()
print 'starting', p.name
for p in procs:
p.join(5)
print 'stopping', p.name
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
该行p.join(5)定义了5秒的超时.由于for循环for p in procs:,程序等待5秒直到第一个进程完成,然后再等待5秒直到第二个进程完成,依此类推,但我希望程序终止所有持续超过5秒的进程.此外,如果所有进程的持续时间都不超过5秒,则程序不得等待5秒钟.
我希望拥有全局对象,所有进程都以最小的锁定进行共享和更新.
import multiprocessing
class Counter(object):
def __init__(self):
self.value = 0
def update(self, value):
self.value += value
def update(counter_proxy, thread_id):
counter_proxy.value.update(1)
print counter_proxy.value.value, 't%s' % thread_id, \
multiprocessing.current_process().name
return counter_proxy.value.value
def main():
manager = multiprocessing.Manager()
counter = manager.Value(Counter, Counter())
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for i in range(10):
pool.apply(func = update, args = (counter, i))
pool.close()
pool.join()
print 'Should be 10 but is %s.' % counter.value.value
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
结果是 - 不是10而是零.看起来该对象的共享值未更新.如何锁定和更新此值?
0 t0 PoolWorker-2
0 t1 PoolWorker-3
0 t2 PoolWorker-5 …Run Code Online (Sandbox Code Playgroud) 我正在使用以下代码将CSV文件拆分为多个块(源自此处)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
但是,无论我选择使用多少个块,似乎块的数量始终保持不变.例如,无论我选择有1个还是10个块,我总是在处理样本文件时获得此输出.理想情况下,我想将文件分块以便公平分配.
请注意,我正在分块的真实文件超过1300万行,这就是为什么我要一块一块地处理它.这是必须的!
6
7
1
...
1
1
94
--- 0.101687192917 …Run Code Online (Sandbox Code Playgroud) 任何人都可以告诉我为什么Python的multiprocessing.cpu_count()功能会1在调用带有四个ARMv7处理器的Jetson TK1时返回?
>>> import multiprocessing
>>> multiprocessing.cpu_count()
1
Run Code Online (Sandbox Code Playgroud)
Jetson TK1开发板或多或少都是开箱即用的,没有人与cpusets混淆.在同一个Python shell中,我可以打印内容,/proc/self/status它告诉我该进程应该可以访问所有四个核心:
>>> print open('/proc/self/status').read()
----- (snip) -----
Cpus_allowed: f
Cpus_allowed_list: 0-3
----- (snip) -----
Run Code Online (Sandbox Code Playgroud)
还有什么可能导致这种行为cpu_count()?
为了测试Klaus的假设,我使用以下代码来运行一个非常简单的实验:
import multiprocessing
def f(x):
n = 0
for i in xrange(10000):
n = max(n, multiprocessing.cpu_count())
return n
p = multiprocessing.Pool(5)
for i in range(10):
print p.map(f, [1,2,3,4,5])
Run Code Online (Sandbox Code Playgroud)
其中产生了以下输出:
[3, 3, 3, 3, 1]
[4, 3, 3, 3, 3]
[4, 3, 3, 3, 3]
[3, 3, …Run Code Online (Sandbox Code Playgroud) 我处于需要并行处理一个非常大的 numpy 数组 (55x117x256x256) 的情况。尝试使用通常的多处理方法传递它会产生 AssertionError,我理解这是因为数组太大而无法复制到每个进程中。因此,我想尝试将共享内存与多处理一起使用。(我对其他方法持开放态度,只要它们不太复杂)。
我看到了一些关于使用 python multiprocessing 的共享内存方法的问题,例如
import numpy as np
import multiprocessing as mp
unsharedData = np.zeros((10,))
sharedData = mp.Array('d', unsharedData)
Run Code Online (Sandbox Code Playgroud)
这似乎工作正常。但是,我还没有看到使用多维数组完成此操作的示例。
我试过只是将多维数组粘贴到mp.Array其中给我TypeError: only size-1 arrays can be converted to Python scalars.
unsharedData2 = np.zeros((10,10))
sharedData2 = mp.Array('d', unsharedData2)## Gives TypeError
Run Code Online (Sandbox Code Playgroud)
我可以展平阵列,但如果可以避免,我宁愿不这样做。
是否有一些技巧可以让多处理数组处理多维数据?
我的代码(遗传优化算法的一部分)并行运行几个进程,等待所有进程完成,读取输出,然后用不同的输入重复.当我用60次重复测试时,一切都很好.由于它有效,我决定使用更真实的重复次数,200.我收到了这个错误:
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 302, in _handle_workers
pool._maintain_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 206, in _maintain_pool
self._repopulate_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 199, in _repopulate_pool
w.start()
File "/usr/lib/python2.7/multiprocessing/process.py", line 130, in start
self._popen = Popen(self)
File "/usr/lib/python2.7/multiprocessing/forking.py", line 120, in __init__
self.pid = os.fork()
OSError: [Errno 12] Cannot allocate memory
Run Code Online (Sandbox Code Playgroud)
这是我使用池的代码片段:
def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc)
results=[]
for arg1 in inputs[1]:
for arg2 …Run Code Online (Sandbox Code Playgroud) python memory-leaks memory-management python-2.7 python-multiprocessing
我是multiprocessingpython中的新软件包,对于那些了解更多内容的人来说,我的困惑可能很容易.我一直在阅读有关并发的内容,并且已经搜索了其他类似的问题并且一无所获.(仅供参考我不希望使用multithreading,因为GIL将我的应用程序有很多慢下来.)
我在事件的框架内思考.我希望有多个进程在运行,等待事件发生.如果事件发生,它将被分配给特定进程,该进程运行然后返回其空闲状态.可能有更好的方法来做到这一点,但我的理由是我应该生成所有进程一次并使它们无限期地打开,而不是每次事件发生时创建然后关闭进程.速度对我来说是一个问题,我的事件每秒可能发生数千次.
我想出了以下玩具示例,其意图是将偶数发送到一个进程,将奇数发送到另一个进程.两个进程都是相同的,它们只是将数字附加到列表中.
from multiprocessing import Process, Queue, Pipe
slist=['even','odd']
Q={}
Q['even'] = Queue()
Q['odd'] = Queue()
ev,od = [],[]
Q['even'].put(ev)
Q['odd'].put(od)
P={}
P['even'] = Pipe()
P['odd'] = Pipe()
def add_num(s):
""" The worker function, invoked in a process. The results are placed in
a list that's pushed to a queue."""
# while True :
if not P[s][1].recv():
print s,'- do nothing'
else:
d = Q[s].get()
print d
d.append(P[s][1].recv())
Q[s].put(d)
print Q[s].get()
P[s][0].send(False)
print …Run Code Online (Sandbox Code Playgroud) python multiprocessing while-loop python-2.7 python-multiprocessing
我有一个基于代理的模型,其中多个代理由中央进程启动,并通过另一个中央进程进行通信.每个代理和通信过程都通过zmq进行通信.但是,当我启动超过100个代理时,standard_out发送:
参数无效(src/stream_engine.cpp:143)打开的文件太多(src/ipc_listener.cpp:292)
和Mac Os提示问题报告:
使用libzmq.5.dylib插件时,Python意外退出.
在我看来,问题是打开了太多的上下文.但是如何通过多处理避免这种情况呢?
我附上以下部分代码:
class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process):
def __init__(self, idn, group, _addresses, trade_logging):
multiprocessing.Process.__init__(self)
....
def run(self):
self.context = zmq.Context()
self.commands = self.context.socket(zmq.SUB)
self.commands.connect(self._addresses['command_addresse'])
self.commands.setsockopt(zmq.SUBSCRIBE, "all")
self.commands.setsockopt(zmq.SUBSCRIBE, self.name)
self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group))
self.out = self.context.socket(zmq.PUSH)
self.out.connect(self._addresses['frontend'])
time.sleep(0.1)
self.database_connection = self.context.socket(zmq.PUSH)
self.database_connection.connect(self._addresses['database'])
time.sleep(0.1)
self.logger_connection = self.context.socket(zmq.PUSH)
self.logger_connection.connect(self._addresses['logger'])
self.messages_in = self.context.socket(zmq.DEALER)
self.messages_in.setsockopt(zmq.IDENTITY, self.name)
self.messages_in.connect(self._addresses['backend'])
self.shout = self.context.socket(zmq.SUB)
self.shout.connect(self._addresses['group_backend'])
self.shout.setsockopt(zmq.SUBSCRIBE, "all")
self.shout.setsockopt(zmq.SUBSCRIBE, self.name)
self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group))
self.out.send_multipart(['!', '!', 'register_agent', self.name])
while True:
try:
self.commands.recv() # catches the group adress.
except …Run Code Online (Sandbox Code Playgroud) 此代码在Linux上运行良好,但在Windows下失败(预期).我知道多处理模块用于fork()生成一个新进程,因此父进程拥有的文件描述符(即打开的套接字)由子进程继承.但是,我的理解是,您可以通过多处理发送的唯一类型的数据需要是可选择的.在Windows和Linux上,套接字对象不是pickleable.
from socket import socket, AF_INET, SOCK_STREAM
import multiprocessing as mp
import pickle
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(("www.python.org", 80))
sock.sendall(b"GET / HTTP/1.1\r\nHost: www.python.org\r\n\r\n")
try:
pickle.dumps(sock)
except TypeError:
print("sock is not pickleable")
def foo(obj):
print("Received: {}".format(type(obj)))
data, done = [], False
while not done:
tmp = obj.recv(1024)
done = len(tmp) < 1024
data.append(tmp)
data = b"".join(data)
print(data.decode())
proc = mp.Process(target=foo, args=(sock,))
proc.start()
proc.join()
Run Code Online (Sandbox Code Playgroud)
我的问题是为什么一个socket对象,一个明显不可拾取的对象,可以通过多处理传递?是不是像Windows那样使用泡菜?
我一直很难使用大型字典(~86GB,17.5亿个密钥)来处理使用Python中的多处理的大数据集(2TB).
上下文:将字符串映射到字符串的字典从pickle文件加载到内存中.加载后,将创建工作进程(理想情况下> 32),该进程必须在字典中查找值,但不能修改其内容,以便处理~2TB数据集.数据集需要并行处理,否则任务将花费一个月的时间.
这里有2 3 4 5 6 7 8 9办法(全部失败),我曾尝试:
将字典存储为Python程序中的全局变量,然后派生~32个工作进程.理论上这个方法可能有效,因为字典没有被修改,因此forkLinux上的COW机制意味着数据结构将被共享而不是在进程之间复制.然而,当我尝试这一点,我的程序崩溃的os.fork()内部multiprocessing.Pool.map从OSError: [Errno 12] Cannot allocate memory.我确信这是因为内核配置为永远不会过度使用内存(/proc/sys/vm/overcommit_memory设置为2,我无法在机器上配置此设置,因为我没有root访问权限).
将字典加载到共享内存字典中multiprocessing.Manager.dict.通过这种方法,我能够在不崩溃的情况下分叉32个工作进程,但后续数据处理比不需要字典的任务的另一个版本慢了几个数量级(唯一的区别是没有字典查找).我认为这是因为包含字典的管理器进程与每个工作进程之间的进程间通信,这是每次单个字典查找所必需的.虽然字典没有被修改,但它被访问了很多次,通常是由许多进程同时访问.
将字典复制到C++中std::map并依赖Linux的COW机制来防止它被复制(如方法#1,除了C++中的字典).通过这种方法,花了很长的时间来加载到字典std::map,并随后从坠毁ENOMEM在os.fork()像以前一样.
将字典复制到pyshmht.复制字典需要太长时间pyshmht.
尝试使用SNAP的HashTable.C++中的底层实现允许在共享内存中制作和使用它.不幸的是,Python API不提供此功能.
使用PyPy.崩溃仍然发生在#1中.
在python中实现我自己的共享内存哈希表multiprocessing.Array.这种方法仍导致#1中出现内存不足错误.
将字典转储到dbm.在尝试将字典转储到dbm数据库中四天并看到"33天"的ETA之后,我放弃了这种方法.
将字典转储到Redis中.当我尝试将字典(86GB dict从1024个较小的dicts中加载)转储到Redis时,redis.mset我通过对等错误重置连接.当我尝试使用循环转储键值对时,需要很长时间.
如何在不需要进程间通信的情况下有效地并行处理此数据集,以便在此字典中查找值.我欢迎任何解决这个问题的建议!
我在Ubuntu上使用Anaconda的Python 3.6.3在具有1TB RAM的机器上.
编辑:最终有效:
我能够使用Redis来实现这一点.为了解决#9中发布的问题,我不得不将大的键值插入和查询查询分成"一口大小"的块,这样它仍然可以批量处理,但是没有超出查询的超时时间.这样做允许在45分钟内插入86GB字典(128个线程和一些负载平衡),并且后续处理不会受到Redis查询查询(在2天内完成)的性能影响.
谢谢大家的帮助和建议.
python ×9
numpy ×2
python-2.7 ×2
bigdata ×1
cpu ×1
csv ×1
dictionary ×1
linux ×1
memory-leaks ×1
sockets ×1
timeout ×1
while-loop ×1
zeromq ×1