PySpark UDF 优化挑战

Rah*_*twa 3 amazon-emr apache-spark pyspark

我正在尝试优化下面的代码。当运行 1000 行数据时,大约需要 12 分钟才能完成。我们的用例需要数据大小约为 25K - 50K 行,这将使此实现完全不可行。

import pyspark.sql.types as Types
import numpy
import spacy
from pyspark.sql.functions import udf

inputPath = "s3://myData/part-*.parquet"
df = spark.read.parquet(inputPath)

test_df = df.select('uid', 'content').limit(1000).repartition(10)

# print(df.rdd.getNumPartitions()) -> 4
# print(test_df.rdd.getNumPartitions()) -> 1

def load_glove(fn):
    vector_dict = {}
    count = 0
    with open(fn) as inf:
        for line in inf:
            count += 1
            eles = line.strip().split()
            token = eles[0]
            try:
                vector_dict[token] = numpy.array([float(x) for x in eles[1:]])
                assert len(vector_dict[token]) == 300
            except:
                print("Exception in load_glove")
                pass
    return vector_dict

# Returning an Array of doubles from the udf
@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = spacy.load('en', max_length=6000000)
  gloveEmbeddingsPath = "/home/hadoop/short_glove_1000.300d.txt"
  glove_embeddings_dict = load_glove(gloveEmbeddingsPath)
  spacy_doc = nlp(text)
  doc_vec = numpy.array([0.0] * 300)
  doc_vec = numpy.float32(doc_vec)
  wordcount = 0
  for sentence_id, sentence in enumerate(spacy_doc.sents):
      for word in sentence:
          if word.text in glove_embeddings_dict:
              # Pre-convert to glove dictionary to float32 representations
              doc_vec += numpy.float32(glove_embeddings_dict[word.text])
              wordcount += 1

  # Document Vector is the average of all word vectors in the document
  doc_vec = doc_vec/(1.0 * wordcount)
  return doc_vec.tolist()

spark.udf.register("generateVectorRepresentation", generateVectorRepresentation)

document_vector_df = test_df.withColumn("Glove Document Vector", generateVectorRepresentation('content'))

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_document_vector_df = document_vector_df.toPandas()

# print(pandas_document_vector_df)
pandas_document_vector_df.head()
Run Code Online (Sandbox Code Playgroud)

我想知道你们是否可以帮忙回答以下问题

每次迭代都会调用 spacy.load() 和 load_glove() 方法吗?有没有办法为每个工作节点准备一次 load_glove() 数据,而不是为每行数据准备一次?load_glove 方法返回一个字典对象,最大可达 5GB。有没有办法在主节点上准备好,然后作为参数传递给 UDF?

感谢您的建议。提前致谢!

Ray*_*Ral 5

是的,在当前的实现中,每次运行函数时都会执行所有模型加载代码,这远非最佳。无法将其从驱动程序直接传递到工作节点,但有一种类似的方法 - 在每个工作节点上初始化模型,但只能初始化一次。为此,您必须使用惰性函数,该函数仅在需要实际结果时才会执行 - 因此,对于工作人员而言。

尝试做这样的事情:

# Here we are not loading the model at the loading time, only the worker code
# will invoke this routine and gets the spacy object. Which means we are loading
# new spacy models on every executors.
SPACY_MODEL = None
def get_spacy_model():
    global SPACY_MODEL
    if not SPACY_MODEL:
       _model = spacy.load('en', max_length=6000000)
    SPACY_MODEL = _model
    return SPACY_MODEL

@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = get_spacy_model()
  # your further processing
Run Code Online (Sandbox Code Playgroud)

我认为您可以尝试将手套加载代码添加到类似的函数中。

您可以尝试在这里阅读更多相关信息: https: //haridas.in/run-spacy-jobs-on-apache-spark.html(这不是我的博客,只是在尝试执行您需要的相同操作时发现了此信息斯帕西模型)。

  • Rai 非常感谢您的回复,这对于加载 spacy 模型和手套字典都有效。它带来了巨大的性能提升!感谢您的回复。对上述模式的一个小修正是 SPACY_MODEL = _model 行需要位于 if 语句内。 (2认同)