如何使用多处理删除非常大的列表中的重复项?

Cha*_*.AY 7 python optimization duplicates multiprocessing python-multiprocessing

假设我有一个包含随机数的巨大列表,例如

L = [random.randrange(0,25000000000) for _ in range(1000000000)]
Run Code Online (Sandbox Code Playgroud)

我需要删除此列表中的重复项

我为包含较少元素的列表编写了此代码

def remove_duplicates(list_to_deduplicate):
seen = set()
result=[]
for i in list_to_deduplicate:
    if i not in seen:
        result.append(i)
        seen.add(i)
return result
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,我创建了一个集合,这样我就可以记住哪些数字已经出现在我正在处理的列表中,如果该数字不在集合中,那么我将它添加到我需要返回的结果列表中并将其保存在设置以便它不会再次添加到结果列表中

现在,对于列表中的 1000000 个数字,一切都很好,我可以快速得到结果,但对于比 1000000000 个问题出现的数字,我需要使用机器上的不同内核来尝试解决问题,然后将多个结果结合起来流程

我的第一个猜测是让所有进程都可以访问一个集合,但会出现许多复杂情况一个进程如何读取而另一个进程正在添加到集合中,我什至不知道是否可以在我知道的进程之间共享一个集合我们可以使用队列或管道,但我不确定如何使用它

有人可以就解决这个问题的最佳方法给我建议吗?我对任何新想法持开放态度

Dar*_*aut 6

我怀疑即使你最大的列表也足够大,以至于多处理会改善时间。使用 numpy 和多线程可能是你最好的机会。

多处理引入了相当多的开销并增加了内存消耗,就像前面提到的@Frank Merrow 一样。但是,对于多线程而言,情况并非如此(就此而言)。不要混淆这些术语很重要,因为进程和线程是不一样的。同一个进程中的线程共享它们的内存,不同的进程不共享。

在 Python 中使用多核的问题是GIL,它不允许多个线程(在同一进程中)并行执行 Python 字节码。一些像 numpy 这样的 C 扩展可以释放 GIL,这可以从多线程的多核并行中获利。这是您通过使用 numpy.

from multiprocessing.dummy import Pool  # .dummy uses threads
import numpy as np

r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
n_threads = 8

result = np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
Run Code Online (Sandbox Code Playgroud)

使用 numpy 和线程池,拆分数组,使子数组在单独的线程中唯一,然后连接子数组并使重新组合的数组再次唯一。最后删除重组阵列的重复项是必要的,因为在子阵列内只能识别本地重复项。

对于低熵数据(许多重复),使用pandas.unique代替numpy.unique可以快得多。与numpy.unique它不同的是,它还保留了外观的顺序。

请注意,使用线程池一样,如果numpy的功能是上述品牌只是感觉没有多线程的 引擎盖下通过调用低级别的数学库。因此,请始终进行测试,看看它是否真的提高了性能,不要认为这是理所当然的。


使用范围内的 100M 随机生成的整数进行测试:

  • 高熵:0 - 25_000_000_000(199560 次重复)
  • 低熵:0 - 1000

代码

import time
import timeit
from multiprocessing.dummy import Pool  # .dummy uses threads

import numpy as np
import pandas as pd


def time_stmt(stmt, title=None):
    t = timeit.repeat(
        stmt=stmt,
        timer=time.perf_counter_ns, repeat=3, number=1, globals=globals()
    )
    print(f"\t{title or stmt}")
    print(f"\t\t{min(t) / 1e9:.2f} s")


if __name__ == '__main__':

    n_threads = 8  # machine with 8 cores (4 physical cores)

    stmt_np_unique_pool = \
"""
np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
"""

    stmt_pd_unique_pool = \
"""
pd.unique(np.concatenate(
    Pool(n_threads).map(pd.unique, np.array_split(r, n_threads)))
).tolist()
"""
    # -------------------------------------------------------------------------

    print(f"\nhigh entropy (few duplicates) {'-' * 30}\n")
    r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")    
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

    # ---
    print(f"\nlow entropy (many duplicates) {'-' * 30}\n")
    r = np.random.RandomState(42).randint(0, 1000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique() & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")
Run Code Online (Sandbox Code Playgroud)

就像您在下面的时序中看到的那样,仅使用没有多线程的 numpy 已经占到最大的性能改进。还要注意pandas.unique()numpy.unique()(仅)许多重复项要快。

from multiprocessing.dummy import Pool  # .dummy uses threads
import numpy as np

r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
n_threads = 8

result = np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
Run Code Online (Sandbox Code Playgroud)