在 Spark 上实现 Trie(或类似的数据结构)

Kay*_*K11 5 python python-2.7 apache-spark pyspark

我是一名实习生,我的任务是在 Spark 集群上实现电话号码的快速搜索算法,使用tries(前缀树),并在几个此类尝试中执行诸如内部联接之类的操作

我设法让它工作于大约 500 万个数字(2 次尝试,每次 250 万个数字),我的任务是将其扩展到 10-2000 万个。但如果我尝试超越我得到的Java.outofmemory错误

现在我的方法是这样的我的代码, - 从spark数据库创建电话号码的数据框, - 使用collect()将250万个数字加载到python列表中的内存(JVM的内存)中 - 将该列表转换为trie - 清除列表 - 在 trie 中搜索要搜索的 number_to_be_searched - 如果找到则返回 true - 否则加载接下来的 250 万个数字,然后重复步骤 3,依此类推

from collections import defaultdict


class Trie:
    # Implement a trie with insert, search.

    def __init__(self):
        self.root = defaultdict()

    def insert(self, word):
        current = self.root
        for letter in word:
            current = current.setdefault(letter, {})
        current.setdefault("_end")

    def search(self, word):
        current = self.root
        for letter in word:
            if letter not in current:
                return False
            current = current[letter]
        if "_end" in current:
            return True
        return False

# these are the inner join and merge functions

def ijoin_util(root1, root2, str):

    for k in root1:
        if k == '_end':
            ijoin_util.join.append(str)
            return
        found = root2.get(k)
        if found != None:
            ijoin_util(root1[k], found, str + k)


def inner_join(root1, root2):
    str = ""
    ijoin_util.join = []
    ijoin_util(root1.root, root2.root, str)
    return ijoin_util.join


def merge_util(root1, root2):
    for k in root1:
        found = root2.get(k)
        if found != None:
            merge_util(root1[k], found)
        else:
            root2.update({k: root1[k]})
            return root2

def merge(root1, root2):
    merge_util(root1.root, root2.root)

Run Code Online (Sandbox Code Playgroud)

我知道对于这个问题来说这是一个非常糟糕的实现,我想知道我是否可以以一种不必将 trie 存储在内存中的方式来实现它,(我的意思是如果我将它存储为嵌套映射的 RDD ),或任何其他可能帮助我进一步扩展规模的方法