计算spark数据帧中的单词数

Hri*_*rma 2 python apache-spark apache-spark-sql pyspark

如何在不使用SQL的REPLACE()函数的情况下找到spark数据帧列中的单词数?下面是我正在使用的代码和输入,但replace()函数不起作用.

from pyspark.sql import SparkSession
my_spark = SparkSession \
    .builder \
    .appName("Python Spark SQL example") \
    .enableHiveSupport() \
    .getOrCreate()

parqFileName = 'gs://caserta-pyspark-eval/train.pqt'
tuesdayDF = my_spark.read.parquet(parqFileName)

tuesdayDF.createOrReplaceTempView("parquetFile")
tuesdaycrimes = spark.sql("SELECT LENGTH(Address) - LENGTH(REPLACE(Address, ' ', ''))+1 FROM parquetFile")

print(tuesdaycrimes.show())


+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-----------+---------+
|              Dates|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|          X|        Y|
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-----------+---------+
|2015-05-14 03:53:00|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST| -122.42589|37.774597|
|2015-05-14 03:53:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST| -122.42589|37.774597|
|2015-05-14 03:33:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|VANNESS AV / GREE...| -122.42436|37.800415|
Run Code Online (Sandbox Code Playgroud)

pau*_*ult 13

有许多方法可以使用pyspark DataFrame函数计算单词,具体取决于您要查找的内容.

创建示例数据

import pyspark.sql.functions as f
data = [
    ("2015-05-14 03:53:00", "WARRANT ARREST"),
    ("2015-05-14 03:53:00", "TRAFFIC VIOLATION"),
    ("2015-05-14 03:33:00", "TRAFFIC VIOLATION")
]

df = sqlCtx.createDataFrame(data, ["Dates", "Description"])
df.show()
Run Code Online (Sandbox Code Playgroud)

在此示例中,我们将计算Description列中的单词.

计算每一行

如果您希望每行的指定列中的单词数量,您可以使用withColumn()并执行以下操作来创建新列:

例如:

df = df.withColumn('wordCount', f.size(f.split(f.col('Description'), ' ')))
df.show()
#+-------------------+-----------------+---------+
#|              Dates|      Description|wordCount|
#+-------------------+-----------------+---------+
#|2015-05-14 03:53:00|   WARRANT ARREST|        2|
#|2015-05-14 03:53:00|TRAFFIC VIOLATION|        2|
#|2015-05-14 03:33:00|TRAFFIC VIOLATION|        2|
#+-------------------+-----------------+---------+
Run Code Online (Sandbox Code Playgroud)

对所有行求和字数

如果要计算整个DataFrame中列中的单词总数,可以使用pyspark.sql.functions.sum():

df.select(f.sum('wordCount')).collect() 
#[Row(sum(wordCount)=6)]
Run Code Online (Sandbox Code Playgroud)

计算每个单词的出现次数

如果您想要整个DataFrame中每个单词的计数,您可以使用split()pyspark.sql.function.explode()后跟a groupBycount().

df.withColumn('word', f.explode(f.split(f.col('Description'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)\
    .show()
#+---------+-----+
#|     word|count|
#+---------+-----+
#|  TRAFFIC|    2|
#|VIOLATION|    2|
#|  WARRANT|    1|
#|   ARREST|    1|
#+---------+-----+
Run Code Online (Sandbox Code Playgroud)


Ram*_*jan 1

您可以将udf函数定义为

def splitAndCountUdf(x):
    return len(x.split(" "))

from pyspark.sql import functions as F
countWords = F.udf(splitAndCountUdf, 'int')
Run Code Online (Sandbox Code Playgroud)

并使用.withColumn函数调用它

tuesdayDF.withColumn("wordCount", countWords(tuesdayDF.address))
Run Code Online (Sandbox Code Playgroud)

如果您想要不同的单词计数,您可以更改该udf函数以包含set

def splitAndCountUdf(x):
    return len(set(x.split(" ")))

from pyspark.sql import functions as F
countWords = F.udf(splitAndCountUdf, 'int')
Run Code Online (Sandbox Code Playgroud)