使Python字典可用于所有spark分区

ano*_*428 4 distributed-computing nearest-neighbor apache-spark pyspark

我正在尝试在pyspark中开发一个算法,我正在使用linalg.SparseVector类.我需要创建一个键值对的字典作为每个SparseVector对象的输入.这里的键必须是整数,因为它们表示整数(在我的例子中代表用户ID).我有一个单独的方法读取输入文件并返回一个字典,其中每个用户ID(字符串)映射到一个整数索引.当我再次浏览文件并执行操作时

FileRdd.map(lambda x:userid_idx [x [0]]).我收到一个KeyError.我想这是因为我的dict对所有分区都不可用.有没有办法让所有分区都可以使用userid_idx dict,类似于MapReduce中的分布式地图?我也为这个烂摊子道歉.我是用手机发布的.将在我的笔记本电脑上更新一段时间.

承诺的代码:

from pyspark.mllib.linalg import SparseVector
from pyspark import SparkContext
import glob
import sys
import time
"""We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts."""
def create_indices(inputdir):
    items=dict()
    user_id_to_idx=dict()
    user_idx_to_id=dict()
    item_idx_to_id=dict()
    item_id_to_idx=dict()
    item_idx=0
    user_idx=0
    for inputfile in glob.glob(inputdir+"/*.txt"):
        print inputfile
        with open(inputfile) as f:
            for line in f:
                toks=line.strip().split("\t")
                try:
                    user_id_to_idx[toks[1].strip()]
                except KeyError:
                    user_id_to_idx[toks[1].strip()]=user_idx
                    user_idx_to_id[user_idx]=toks[1].strip()
                    user_idx+=1
                try:
                    item_id_to_idx[toks[0].strip()]
                except KeyError:
                    item_id_to_idx[toks[0].strip()]=item_idx
                    item_idx_to_id[item_idx]=toks[0].strip()
                    item_idx+=1
    return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx

# pass in the hdfs path to the input files and the spark context.
def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx):
    rdd_text=sc.textFile(inputdir)
    try:

        new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split("\t")[0])],{user_id_to_idx[str(x.strip().split("\t")[1])]:1})).reduceByKey(lambda x,y: x.update(y))
    except KeyError:
        sys.exit(1)
    new_rdd.saveAsTextFile("hdfs:path_to_output/user/hadoop/knn/output")

if __name__=="__main__":
    sc = SparkContext()
    u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1])
    u_idx_to_id_b=sc.broadcast(u_idx_to_id)
    u_id_to_idx_b=sc.broadcast(u_id_to_idx)
    i_idx_to_idx_b=sc.broadcast(i_idx_to_id)
    i_id_to_idx_b=sc.broadcast(i_id_to_idx)
    num_users=sc.broadcast(u_idx)
    num_items=sc.broadcast(i_idx)
    runKNN(sys.argv[1],sc,u_id_to_idx_b.value,i_id_to_idx_b.value)
Run Code Online (Sandbox Code Playgroud)

dpe*_*ock 5

在Spark中,您可以在所有任务中使用该字典.例如:

dictionary = {1:"red", 2:"blue"}
rdd = sc.parallelize([1,2])
rdd.map(lambda x: dictionary[x]).collect()
# Prints ['red', 'blue']
Run Code Online (Sandbox Code Playgroud)

你可能会发现你的问题实际上是你的字典中没有你正在查找的密钥!

Spark文档:

通常,当在远程集群节点上执行传递给Spark操作(例如map或reduce)的函数时,它将在函数中使用的所有变量的单独副本上工作.这些变量将复制到每台计算机,并且远程计算机上的变量的更新不会传播回驱动程序.

引用的局部变量的副本将与任务一起发送到节点.

广播变量在这里没有帮助,它们只是一个通过每个节点发送一次而不是每个任务一次来提高性能的工具.