Python:在字符串列表中优化搜索子字符串

Alo*_*pex 17 python substring string-matching

我有一个特殊的问题,我想在许多字符串列表中搜索许多子字符串.以下是我要做的事情的要点:

listStrings = [ACDE, CDDE, BPLL, ... ]

listSubstrings = [ACD, BPI, KLJ, ...]
Run Code Online (Sandbox Code Playgroud)

以上条目仅是示例.len(listStrings)是~60,000,len(listSubstrings)是~50,000-300,000,len(listStrings [i])是10到30,000.

我目前的Python尝试是:

for i in listSubstrings:
   for j in listStrings:
       if i in j:
          w.write(i+j)
Run Code Online (Sandbox Code Playgroud)

或者沿着这些方向的东西.虽然这对我的任务起作用,但速度非常慢,使用一个核心并按照40分钟的顺序完成任务.有没有办法加快速度?

我不相信我可以用listStrings:listSubstrings制作一个dict,因为有可能需要在两端存储重复的条目(尽管我可以尝试这个,如果我能找到一种方法来附加一个唯一的标签每一个,因为dicts是如此快得多).同样,我认为我不能预先计算可能的子串.我甚至不知道搜索dict键是否比搜索列表更快(因为dict.get()它将提供特定输入而不是寻找子输入).在内存中搜索列表是否相对较慢?

Sha*_*ger 14

对于你正在尝试的那种东西(在一大堆其他字符串中搜索一大堆字符串的固定集合),并行化和小调整将无济于事.您需要算法改进.

首先,我建议使用Aho-Corasick字符串匹配算法.基本上,为了交换一些预先计算工作来从您的固定字符串集合构建匹配器对象,您可以一次扫描另一个字符串以查找所有这些固定字符串.

因此,不是每次扫描60K字符串50K +(三个BILLION扫描?!?),您可以每次扫描一次,成本仅比普通单次扫描略高,并获得所有命中.

最好的部分是,你不是自己写的.PyPI(Python包索引)已经pyahocorasick为您编写了包.试试吧.

使用示例:

import ahocorasick

listStrings = [ACDE, CDDE, BPLL, ...]
listSubstrings = [ACD, BPI, KLJ, ...]

auto = ahocorasick.Automaton()
for substr in listSubstrings:
    auto.add_word(substr, substr)
auto.make_automaton()

...

for astr in listStrings:
    for end_ind, found in auto.iter(astr):
        w.write(found+astr)
Run Code Online (Sandbox Code Playgroud)

write如果在被搜索的字符串("haystack")中找到一个子串("needle")不止一次,这将多次.您可以write通过使用set重复数据删除来更改循环,使其仅在给定大海捞针中给定针的第一次击中时:

for astr in listStrings:
    seen = set()
    for end_ind, found in auto.iter(astr):
        if found not in seen:
            seen.add(found)
            w.write(found+astr)
Run Code Online (Sandbox Code Playgroud)

您可以进一步调整此输出针对给定草垛的针头,它们出现的顺序与它们出现的顺序相同listSubstrings(并且在您使用时无统一),通过将单词的索引存储为或与其值一起,这样您就可以对命中数进行排序(可能是小的)数字,所以排序开销是微不足道的):

from future_builtins import map  # Only on Py2, for more efficient generator based map
from itertools import groupby
from operator import itemgetter

auto = ahocorasick.Automaton()
for i, substr in enumerate(listSubstrings):
    # Store index and substr so we can recover original ordering
    auto.add_word(substr, (i, substr))
auto.make_automaton()

...

for astr in listStrings:
    # Gets all hits, sorting by the index in listSubstrings, so we output hits
    # in the same order we theoretically searched for them
    allfound = sorted(map(itemgetter(1), auto.iter(astr)))
    # Using groupby dedups already sorted inputs cheaply; the map throws away
    # the index since we don't need it
    for found, _ in groupby(map(itemgetter(1), allfound)):
        w.write(found+astr)
Run Code Online (Sandbox Code Playgroud)

为了进行性能比较,我在mgc的答案中使用了一个变体,它更可能包含匹配,以及扩大干草堆.首先,设置代码:

>>> from random import choice, randint
>>> from string import ascii_uppercase as uppercase
>>> # 5000 haystacks, each 1000-5000 characters long
>>> listStrings = [''.join([choice(uppercase) for i in range(randint(1000, 5000))]) for j in range(5000)]
>>> # ~1000 needles (might be slightly less for dups), each 3-12 characters long
>>> listSubstrings = tuple({''.join([choice(uppercase) for i in range(randint(3, 12))]) for j in range(1000)})
>>> auto = ahocorasick.Automaton()
>>> for needle in listSubstrings:
...     auto.add_word(needle, needle)
...
>>> auto.make_automaton()
Run Code Online (Sandbox Code Playgroud)

现在要实际测试它(使用ipython %timeitmagic for microbenchmarks):

>>> sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
80279  # Will differ depending on random seed
>>> sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
80279  # Same behavior after uniquifying results
>>> %timeit -r5 sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
1 loops, best of 5: 9.79 s per loop
>>> %timeit -r5 sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
1 loops, best of 5: 460 ms per loop
Run Code Online (Sandbox Code Playgroud)

因此,为了检查5000个中等大小字符串中每个字符串中的~1000个小字符串,pyahocorasick在我的机器上将个人成员资格测试的频率提高了约21倍.随着尺寸的listSubstrings增加,它也可以很好地扩展; 当我以相同的方式初始化它时,但是使用10,000个小字符串而不是1000个字符串,所需的总时间从~460 ms增加到~852 ms,这是1.85倍时间乘数,执行10倍的逻辑搜索.

对于记录,在这种情况下构建自动机的时间微不足道.你需要在每个干草堆前面支付一次,并且测试显示~1000字符串自动机需要大约1.4毫秒才能构建并占用~277 KB的内存(超出字符串本身); ~10000字符串自动机需要~21 ms才能构建,占用~2.45 MB的内存.


mgc*_*mgc 2

也许您可以尝试将两个列表之一(最大的?尽管直观上我会剪切listStrings)分成较小的列表,然后使用线程并行运行这些搜索( 类Pool提供multiprocessing了一种方便的方法来执行此操作)?我使用以下方法获得了一些显着的加速:

from multiprocessing import Pool
from itertools import chain, islice

# The function to be run in parallel :
def my_func(strings):
    return [j+i for i in strings for j in listSubstrings if i.find(j)>-1]

# A small recipe from itertools to chunk an iterable :
def chunk(it, size):
    it = iter(it)
    return iter(lambda: tuple(islice(it, size)), ())

# Generating some fake & random value :
from random import randint
listStrings = \
    [''.join([chr(randint(65, 90)) for i in range(randint(1, 500))]) for j in range(10000)]
listSubstrings = \
    [''.join([chr(randint(65, 90)) for i in range(randint(1, 100))]) for j in range(1000)]

# You have to prepare the searches to be performed:
prep = [strings for strings in chunk(listStrings, round(len(listStrings) / 8))]
with Pool(4) as mp_pool:
    # multiprocessing.map is a parallel version of map()
    res = mp_pool.map(my_func, prep)
# The `res` variable is a list of list, so now you concatenate them
# in order to have a flat result list
result = list(chain.from_iterable(res))
Run Code Online (Sandbox Code Playgroud)

然后你可以编写整个result变量(而不是逐行编写):

with open('result_file', 'w') as f:
    f.write('\n'.join(result))
Run Code Online (Sandbox Code Playgroud)

编辑 01/05/18:按照 ShadowRanger 的建议,使用副作用来平坦化结果,itertools.chain.from_iterable而不是使用副作用的丑陋解决方法。map