Hri*_*rma 2 python apache-spark apache-spark-sql pyspark
如何在不使用SQL的REPLACE()函数的情况下找到spark数据帧列中的单词数?下面是我正在使用的代码和输入,但replace()函数不起作用.
from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("Python Spark SQL example") \
.enableHiveSupport() \
.getOrCreate()
parqFileName = 'gs://caserta-pyspark-eval/train.pqt'
tuesdayDF = my_spark.read.parquet(parqFileName)
tuesdayDF.createOrReplaceTempView("parquetFile")
tuesdaycrimes = spark.sql("SELECT LENGTH(Address) - LENGTH(REPLACE(Address, ' ', ''))+1 FROM parquetFile")
print(tuesdaycrimes.show())
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-----------+---------+
| Dates| Category| Descript|DayOfWeek|PdDistrict| Resolution| Address| X| Y|
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-----------+---------+
|2015-05-14 03:53:00| WARRANTS| WARRANT ARREST|Wednesday| NORTHERN|ARREST, BOOKED| OAK ST / LAGUNA ST| -122.42589|37.774597|
|2015-05-14 03:53:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday| NORTHERN|ARREST, BOOKED| OAK ST / LAGUNA ST| -122.42589|37.774597|
|2015-05-14 03:33:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday| NORTHERN|ARREST, BOOKED|VANNESS AV / GREE...| -122.42436|37.800415|
Run Code Online (Sandbox Code Playgroud)
pau*_*ult 13
有许多方法可以使用pyspark DataFrame函数计算单词,具体取决于您要查找的内容.
创建示例数据
import pyspark.sql.functions as f
data = [
("2015-05-14 03:53:00", "WARRANT ARREST"),
("2015-05-14 03:53:00", "TRAFFIC VIOLATION"),
("2015-05-14 03:33:00", "TRAFFIC VIOLATION")
]
df = sqlCtx.createDataFrame(data, ["Dates", "Description"])
df.show()
Run Code Online (Sandbox Code Playgroud)
在此示例中,我们将计算Description列中的单词.
计算每一行
如果您希望每行的指定列中的单词数量,您可以使用withColumn()并执行以下操作来创建新列:
pyspark.sql.functions.split()将字符串分解为列表pyspark.sql.functions.size()数列表的长度例如:
df = df.withColumn('wordCount', f.size(f.split(f.col('Description'), ' ')))
df.show()
#+-------------------+-----------------+---------+
#| Dates| Description|wordCount|
#+-------------------+-----------------+---------+
#|2015-05-14 03:53:00| WARRANT ARREST| 2|
#|2015-05-14 03:53:00|TRAFFIC VIOLATION| 2|
#|2015-05-14 03:33:00|TRAFFIC VIOLATION| 2|
#+-------------------+-----------------+---------+
Run Code Online (Sandbox Code Playgroud)
对所有行求和字数
如果要计算整个DataFrame中列中的单词总数,可以使用pyspark.sql.functions.sum():
df.select(f.sum('wordCount')).collect()
#[Row(sum(wordCount)=6)]
Run Code Online (Sandbox Code Playgroud)
计算每个单词的出现次数
如果您想要整个DataFrame中每个单词的计数,您可以使用split()和pyspark.sql.function.explode()后跟a groupBy和count().
df.withColumn('word', f.explode(f.split(f.col('Description'), ' ')))\
.groupBy('word')\
.count()\
.sort('count', ascending=False)\
.show()
#+---------+-----+
#| word|count|
#+---------+-----+
#| TRAFFIC| 2|
#|VIOLATION| 2|
#| WARRANT| 1|
#| ARREST| 1|
#+---------+-----+
Run Code Online (Sandbox Code Playgroud)
您可以将udf函数定义为
def splitAndCountUdf(x):
return len(x.split(" "))
from pyspark.sql import functions as F
countWords = F.udf(splitAndCountUdf, 'int')
Run Code Online (Sandbox Code Playgroud)
并使用.withColumn函数调用它
tuesdayDF.withColumn("wordCount", countWords(tuesdayDF.address))
Run Code Online (Sandbox Code Playgroud)
如果您想要不同的单词计数,您可以更改该udf函数以包含set为
def splitAndCountUdf(x):
return len(set(x.split(" ")))
from pyspark.sql import functions as F
countWords = F.udf(splitAndCountUdf, 'int')
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10025 次 |
| 最近记录: |