在Spark中广播“烦人”对象(针对最近的邻居)?

xen*_*yon 2 python nearest-neighbor knn apache-spark pyspark

由于Spark的mllib不具有最近邻居功能,因此我尝试将Annoy用于近似最近邻居。我尝试广播Annoy对象并将其传递给工作人员。但是,它没有按预期运行。

以下是可再现性的代码(将在PySpark中运行)。在将Annoy与不带Spark搭配使用时,看到的差异突出了问题。

from annoy import AnnoyIndex
import random
random.seed(42)

f = 40
t = AnnoyIndex(f)  # Length of item vector that will be indexed
allvectors = []
for i in xrange(20):
    v = [random.gauss(0, 1) for z in xrange(f)]
    t.add_item(i, v)
    allvectors.append((i, v))
t.build(10) # 10 trees

# Use Annoy with Spark
sparkvectors = sc.parallelize(allvectors)
bct = sc.broadcast(t)
x = sparkvectors.map(lambda x: bct.value.get_nns_by_vector(vector=x[1], n=5))
print "Five closest neighbors for first vector with Spark:",
print x.first()

# Use Annoy without Spark
print "Five closest neighbors for first vector without Spark:",
print(t.get_nns_by_vector(vector=allvectors[0][1], n=5))
Run Code Online (Sandbox Code Playgroud)

看到的输出:

Spark的第一个向量的五个最近邻居:无

没有火花的第一个向量的五个最近邻居:[0,13,12,6,4]

zer*_*323 5

我从未使用过Annoy,但是我很确定软件包说明可以解释这里发生的事情:

它还会创建大型的基于文件的只读数据结构,这些数据结构被映射到内存中,以便许多进程可以共享相同的数据。

由于它在序列化并将其传递给工作程序时使用的是内存映射索引,因此所有数据都会丢失。

尝试这样的事情:

from pyspark import SparkFiles

t.save("index.ann")
sc.addPyFile("index.ann")

def find_neighbors(iter):
    t = AnnoyIndex(f)
    t.load(SparkFiles.get("index.ann"))
    return (t.get_nns_by_vector(vector=x[1], n=5) for x in iter)

sparkvectors.mapPartitions(find_neighbors).first()
## [0, 13, 12, 6, 4]
Run Code Online (Sandbox Code Playgroud)


joe*_*joe 5

以防万一其他人像我一样跟随这里,您需要在函数中导入 Annoy mapPartitions,否则您仍然会遇到酸洗错误。这是我根据上述内容完成的示例:

from annoy import AnnoyIndex

from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark import SparkConf

import random
random.seed(42)

f = 1024
t = AnnoyIndex(f)
allvectors = []
for i in range(100):
    v = [random.gauss(0, 1) for z in range(f)]
    t.add_item(i, v)
    allvectors.append((i, v))

t.build(10)
t.save("index.ann")

def find_neighbors(i):
    from annoy import AnnoyIndex
    ai = AnnoyIndex(f)
    ai.load(SparkFiles.get("index.ann"))
    return (ai.get_nns_by_vector(vector=x[1], n=5) for x in i)

with SparkContext(conf=SparkConf().setAppName("myannoy")) as sc:
  sc.addFile("index.ann")
  sparkvectors = sc.parallelize(allvectors)
  sparkvectors.mapPartitions(find_neighbors).first()
Run Code Online (Sandbox Code Playgroud)