我有一个 pyspark 数据框,包含三列:user_id、follower_count 和 tweet,其中 tweet 是字符串类型。
首先,我需要执行以下预处理步骤: - 小写所有文本 - 删除标点符号(以及任何其他非 ascii 字符) - 标记单词(用 ' ' 分割)
然后我需要汇总所有推文值的这些结果: - 查找每个单词出现的次数 - 按频率排序 - 提取前 n 个单词及其各自的计数
我在 GitHub 上找到了以下资源 wordcount.py;但是,我不明白代码在做什么;因此,我在笔记本中调整它时遇到一些困难。
https://github.com/apache/spark/blob/master/examples/src/main/python/wordcount.py
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
Run Code Online (Sandbox Code Playgroud)
编辑1:我认为我没有明确表示我正在尝试将此分析应用于专栏推文。
编辑2:我更改了上面的代码,插入 df.tweet 作为传递到第一行代码的参数并触发了错误。所以我认为列不能传递到这个工作流程中;我不知道如何解决这个问题。
TypeError: Column is not iterable
Run Code Online (Sandbox Code Playgroud)
我按照建议添加了一些调整。奇迹般有效!我不知道我可以将用户定义的函数发送到 lambda 函数中。事实证明,这是将此步骤添加到工作流程中的简单方法。
import re
def process_text(text):
text = text.lower()
text = re.sub(pattern='[^A-z ^\s]',repl='',string=text).split(' ')
return [word for word in text if word != '']
process_text('hi 343')
>>>> ['hi']
count_rdd = df.select("tweet").rdd.flatMap(lambda x: process_text(x[0])) \
.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)
count_rdd.collect()
Run Code Online (Sandbox Code Playgroud)
不确定该错误是由于for (word, count) in output:列上的 RDD 操作引起的。
但是,您可以简单地使用:
count_rdd = df.select("tweets").rdd.flatMap(lambda x: x[0].split(' ')) \
.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)
Run Code Online (Sandbox Code Playgroud)
您想要做的是对pyspark.sql.column.Column对象进行 RDD 操作。上面是该列中所有单词的简单字数统计。
如果您想在列本身上进行操作,可以使用explode()来完成此操作:
import pyspark.sql.functions as F
count_df = df.withColumn('word', F.explode(F.split(F.col('tweets'), ' ')))\
.groupBy('word')\
.count()\
.sort('count', ascending=False)
Run Code Online (Sandbox Code Playgroud)
您将能够使用regexp_replace()和lower()frompyspark.sql.functions来执行预处理步骤。