Spark中数据清理的方法

Sac*_*rma 0 scala nltk python-3.x apache-spark pyspark

我是数据工程/机器学习和自学的新学生。在处理示例问题时,我遇到了以下数据清理任务

1. Remove extra whitespaces (keep one whitespace in between word but remove more 
than one whitespaces) and punctuations

2. Turn all the words to lower case and remove stop words (list from NLTK)

3. Remove duplicate words in ASSEMBLY_NAME column
Run Code Online (Sandbox Code Playgroud)

尽管我在大学作业期间一直在编写代码来执行这些任务,但我从未在任何项目中使用过一段代码来完成这些任务,并且我正在寻求专家的指导,他们可以通过指出来帮助我寻求完成任务的最佳方法(in python or scala)

目前已完成的工作:

1.从parquet文件中读取数据

partFitmentDF = sqlContext.read.parquet("/mnt/blob/devdatasciencesto/pga-parts-forecast/raw/parts-fits/")

display(partFitmentDF)
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述 2.从DF创建表

partFitmentDF.createOrReplaceTempView("partsFits")
partFitmentDF.write.mode("overwrite").format("delta").saveAsTable("partsFitsTable")
Run Code Online (Sandbox Code Playgroud)

3. 重新排列表中的 Fits_Assembly_name 数据,以便每个不同的 Itemno 的所有 Fits_Assembly_Name 和 Fits_Assembly_ID 都滚动到单行

%sql

select itemno, concat_ws(' | ' , collect_set(cast(fits_assembly_id as int))) as fits_assembly_id, concat_ws(' | ' ,collect_set(fits_assembly_name)) as fits_assembly_name 
from partsFitsTable 
WHERE itemno = 1014584
group by itemno
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

聚苯乙烯

partFitmentDF 选定列的示例数据

itemno  fits_assembly_id        fits_assembly_name
0450056 44011           OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)
0450056 135502          OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)
0450056 37884           OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)
0450056 19618           OIL PUMP ASSEMBLY - A06FA09CA/CB/CC (4999202399920239A06)
0450056 135021          OIL PUMP ASSEMBLY - A02EA05CA (4999202399920239A06)
0450056 4147            OIL PUMP ASSEMBLY - A04KA05CA (4999202359920235A06)
0450056 12003           OIL PUMP ASSEMBLY - A05FA09CA/CB/CC (4999202399920239A06)
Run Code Online (Sandbox Code Playgroud)

现在,我需要将这些多行按项目编号滚动到一行(属于一个项目编号的所有程序集名称和 id 应该在一行中),然后我需要执行任务#1、2 和 3,如最前面列出的那样top 来清理 Fits_Assembly_name 列,并将处理后的数据保存到带有 itemno、Fits_Assembly_id 和 Fits_Assembly_name 列的最终数据帧或表中,但我不确定如何开始在 Python 中执行此操作。您能否通过建议方法(和代码提示)来帮助我,以便我可以进一步完成这项任务?

jxc*_*jxc 5

检查以下内容是否适合您。我假设这df是您已经运行的 groupby 和collect_set 之后的数据框:

from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.sql.functions import expr
Run Code Online (Sandbox Code Playgroud)

任务 1:使用RegexTokenizer

使用pattern(?:\p{Punct}|\s)+分割字符串,将结果保存到temp1列中。生成的字符串数组将包含所有小写项目,前导/尾随空格也将被删除。

tk = RegexTokenizer(pattern=r'(?:\p{Punct}|\s)+', inputCol='fits_assembly_name', outputCol='temp1')

df1 = tk.transform(df)
Run Code Online (Sandbox Code Playgroud)

任务 2:使用StopWordsRemover

删除停用词并将结果保存到temp2列中:

sw = StopWordsRemover(inputCol='temp1', outputCol='temp2')

df2 = sw.transform(df1)
Run Code Online (Sandbox Code Playgroud)

您可以通过键入 来检查所有当前停用词sw.getStopWords(),检查loadDefaultStopWords(language)以切换到另一种语言设置,或通过以下方式附加您自己的停用词:

mylist = sw.getStopWords() + ['my', 'black', 'list']
# then adjust the transformer to the following
sw = StopWordsRemover(inputCol='temp1', outputCol='temp2', stopWords=mylist)
Run Code Online (Sandbox Code Playgroud)

此时,您应该有一个temp2删除了停用词的字符串列数组。

任务3:

使用array_distinct()删除重复项,使用 concat_ws() 将数组转换为字符串,然后删除两个临时列:

df_new = df2.withColumn('fits_assembly_name', expr('concat_ws(" ", array_distinct(temp2))')) \
            .drop('temp1', 'temp2')
Run Code Online (Sandbox Code Playgroud)

如果您对上述代码有任何问题,请告诉我。