使用PySpark进行高效的文本预处理(清除,标记化,停用词,词干,过滤器)

vnd*_*hol 7 python text-processing apache-spark apache-spark-sql pyspark

最近,我开始在《学习火花》一书中学习火花。从理论上讲,一切都是清楚的,在实践中,我面对的事实是,我首先需要对文本进行预处理,但是没有关于此主题的实际提示。

我考虑到的第一件事是,现在最好使用数据框而不是RDD,因此我对数据框进行了预处理。

所需的操作:

  1. 清除标点符号中的文本(regexp_replace)
  2. 令牌化(令牌生成器)
  3. 删除停用词(StopWordsRemover)
  4. 灭菌(SnowballStemmer)
  5. 过滤短词(udf)

我的代码是:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer

spark = SparkSession.builder \
    .config("spark.executor.memory", "3g") \
    .config("spark.driver.cores", "4") \
    .getOrCreate()
df = spark.read.json('datasets/entitiesFull/full').select('id', 'text')

# Clean text
df_clean = df.select('id', (lower(regexp_replace('text', "[^a-zA-Z\\s]", "")).alias('text')))

# Tokenize text
tokenizer = Tokenizer(inputCol='text', outputCol='words_token')
df_words_token = tokenizer.transform(df_clean).select('id', 'words_token')

# Remove stop words
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df_words_no_stopw = remover.transform(df_words_token).select('id', 'words_clean')

# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_words_no_stopw.withColumn("words_stemmed", stemmer_udf("words_clean")).select('id', 'words_stemmed')

# Filter length word > 3
filter_length_udf = udf(lambda row: [x for x in row if len(x) >= 3], ArrayType(StringType()))
df_final_words = df_stemmed.withColumn('words', filter_length_udf(col('words_stemmed')))
Run Code Online (Sandbox Code Playgroud)

处理需要很长时间,整个文档的大小为60 GB。使用RDD是否有意义?缓存会有所帮助吗?如何优化预处理?

首先,我在本地计算机上测试了实现,然后将在集群上进行尝试。本地计算机-Ubuntu RAM 6Gb,4个CPU。也欢迎任何替代解决方案。谢谢!

Pow*_*ers 1

JSON 通常是 Spark 分析中最差的文件格式,尤其是单个 60GB JSON 文件时。Spark 可以很好地处理 1GB Parquet 文件。一些预处理会有很大帮助:

temp_df = spark.read.json('datasets/entitiesFull/full').select('id', 'text').repartition(60)
temp_df.write.parquet('some/other/path')
df = spark.read.parquet('some/other/path')
# ... continue the rest of the analysis
Run Code Online (Sandbox Code Playgroud)

从性能的角度来看,将其包装SnowballStemmer在 UDF 中并不是最好的,但是是最现实的,除非您习惯用低级 Java 字节码编写算法。我还使用 UDF在ceja中创建了一个 Porter Stemming 算法。

以下是Spark 函数的本机实现示例。实施是可能的,但并不容易。