Jon*_*ton 10 python dictionary bigdata python-multiprocessing
我一直很难使用大型字典(~86GB,17.5亿个密钥)来处理使用Python中的多处理的大数据集(2TB).
上下文:将字符串映射到字符串的字典从pickle文件加载到内存中.加载后,将创建工作进程(理想情况下> 32),该进程必须在字典中查找值,但不能修改其内容,以便处理~2TB数据集.数据集需要并行处理,否则任务将花费一个月的时间.
这里有2 3 4 5 6 7 8 9办法(全部失败),我曾尝试:
将字典存储为Python程序中的全局变量,然后派生~32个工作进程.理论上这个方法可能有效,因为字典没有被修改,因此fork
Linux上的COW机制意味着数据结构将被共享而不是在进程之间复制.然而,当我尝试这一点,我的程序崩溃的os.fork()
内部multiprocessing.Pool.map
从OSError: [Errno 12] Cannot allocate memory
.我确信这是因为内核配置为永远不会过度使用内存(/proc/sys/vm/overcommit_memory
设置为2
,我无法在机器上配置此设置,因为我没有root访问权限).
将字典加载到共享内存字典中multiprocessing.Manager.dict
.通过这种方法,我能够在不崩溃的情况下分叉32个工作进程,但后续数据处理比不需要字典的任务的另一个版本慢了几个数量级(唯一的区别是没有字典查找).我认为这是因为包含字典的管理器进程与每个工作进程之间的进程间通信,这是每次单个字典查找所必需的.虽然字典没有被修改,但它被访问了很多次,通常是由许多进程同时访问.
将字典复制到C++中std::map
并依赖Linux的COW机制来防止它被复制(如方法#1,除了C++中的字典).通过这种方法,花了很长的时间来加载到字典std::map
,并随后从坠毁ENOMEM
在os.fork()
像以前一样.
将字典复制到pyshmht
.复制字典需要太长时间pyshmht
.
尝试使用SNAP
的HashTable.C++中的底层实现允许在共享内存中制作和使用它.不幸的是,Python API不提供此功能.
使用PyPy.崩溃仍然发生在#1中.
在python中实现我自己的共享内存哈希表multiprocessing.Array
.这种方法仍导致#1中出现内存不足错误.
将字典转储到dbm
.在尝试将字典转储到dbm
数据库中四天并看到"33天"的ETA之后,我放弃了这种方法.
将字典转储到Redis中.当我尝试将字典(86GB dict从1024个较小的dicts中加载)转储到Redis时,redis.mset
我通过对等错误重置连接.当我尝试使用循环转储键值对时,需要很长时间.
如何在不需要进程间通信的情况下有效地并行处理此数据集,以便在此字典中查找值.我欢迎任何解决这个问题的建议!
我在Ubuntu上使用Anaconda的Python 3.6.3在具有1TB RAM的机器上.
编辑:最终有效:
我能够使用Redis来实现这一点.为了解决#9中发布的问题,我不得不将大的键值插入和查询查询分成"一口大小"的块,这样它仍然可以批量处理,但是没有超出查询的超时时间.这样做允许在45分钟内插入86GB字典(128个线程和一些负载平衡),并且后续处理不会受到Redis查询查询(在2天内完成)的性能影响.
谢谢大家的帮助和建议.
您应该使用一个旨在与许多不同进程共享大量数据的系统 - 比如数据库.
获取您的巨型数据集并为其创建架构并将其转储到数据库中.你甚至可以将它放在一台单独的机器上.
然后,根据需要在所需数量的主机上启动任意数量的进程,以并行处理数据.几乎任何现代数据库都能够处理负载.
如果您可以在第 1 点成功将该数据加载到单个进程中,您很可能可以通过使用https://bugs.python.org/issue31558 中gc.freeze
引入的方法来解决分叉复制的问题
您必须使用 python 3.7+ 并在 fork 之前调用该函数。(或在对进程池进行映射之前)
由于这需要整个内存的虚拟副本才能让 CoW 工作,因此您需要确保您的过量使用设置允许您这样做。
正如这里大多数人已经提到的:
不要使用那么大的字典,而是将其转储到数据库上!
将数据转储到数据库后,使用索引将有助于减少数据检索时间。这里有
一个关于 PostgreSQL 数据库的很好的索引解释。您可以进一步优化您的数据库(我给出了一个 PostgreSQL 示例,因为这是我最常使用的,但这些概念几乎适用于每个数据库)
假设您执行了上述操作(或者如果您想以任何一种方式使用字典...),您可以使用 Python 实现并行和异步处理例程asyncio
(需要 Python 版本 >= 3.4)。
基本思想是创建一个映射方法,将异步任务分配(映射)到可迭代的每个项目,并将每个任务注册到 asyncio 的event_loop
.
最后,我们将收集所有这些承诺,asyncio.gather
并等待收到所有结果。
这个想法的框架代码示例:
import asyncio
async def my_processing(value):
do stuff with the value...
return processed_value
def my_async_map(my_coroutine, my_iterable):
my_loop = asyncio.get_event_loop()
my_future = asyncio.gather(
*(my_coroutine(val) for val in my_iterable)
)
return my_loop.run_until_complete(my_future)
my_async_map(my_processing, my_ginormous_iterable)
Run Code Online (Sandbox Code Playgroud)
gevent
asyncio 代替,但请记住 asyncio 是标准库的一部分。
Gevent 实施:
import gevent
from gevent.pool import Group
def my_processing(value):
do stuff with the value...
return processed_value
def my_async_map(my_coroutine, my_iterable):
my_group = Group()
return my_group.map(my_coroutine, my_iterable)
my_async_map(my_processing, my_ginormous_iterable)
Run Code Online (Sandbox Code Playgroud)
不要使用字典,而是使用压缩数据但仍然具有快速查找的数据结构。
例如:
keyvi:https : //github.com/cliqz-oss/keyvi keyvi 是一种基于 FSA 的键值数据结构,针对空间和查找速度进行了优化。从 keyvi 读取的多个进程将重用内存,因为 keyvi 结构是内存映射的,它使用共享内存。由于您的工作进程不需要修改数据结构,我认为这将是您最好的选择。
marisa trie:https : //github.com/pytries/marisa-trie Python 的静态 trie 结构,基于 marisa-trie C++ 库。与 keyvi 一样,marisa-trie 也使用内存映射。使用同一个trie 的多个进程将使用相同的内存。
编辑:
要将 keyvi 用于此任务,您可以先使用pip install pykeyvi
. 然后像这样使用它:
from pykeyvi import StringDictionaryCompiler, Dictionary
# Create the dictionary
compiler = StringDictionaryCompiler()
compiler.Add('foo', 'bar')
compiler.Add('key', 'value')
compiler.Compile()
compiler.WriteToFile('test.keyvi')
# Use the dictionary
dct = Dictionary('test.keyvi')
dct['foo'].GetValue()
> 'bar'
dct['key'].GetValue()
> 'value'
Run Code Online (Sandbox Code Playgroud)
marisa trie 只是一个 trie,所以它不能作为一个开箱即用的映射,但是例如我们可以使用一个分隔符字符来将键与值分开。