将PySpark数据帧列的聚合值存储到变量中

Sid*_*Sid 2 apache-spark pyspark

我在这里使用PySpark数据帧."test1"是我的PySpark数据帧,event_date是TimestampType.因此,当我尝试获取event_date的非常计数时,结果是一个整数变量,但是当我尝试获取同一列的最大值时,结果是一个数据帧.我想了解数据框架和变量中的操作结果.我还想知道如何将事件日期的最大值存储为变量

导致整数类型的代码:

loop_cnt=test1.select('event_date').distinct().count()
type(loop_cnt)
Run Code Online (Sandbox Code Playgroud)

导致数据框类型的代码:

last_processed_dt=test1.select([max('event_date')])
type(last_processed_dt)
Run Code Online (Sandbox Code Playgroud)

编辑添加可重现的示例:

schema = StructType([StructField("event_date", TimestampType(), True)])

df = sqlContext.createDataFrame([(datetime(2015, 8, 10, 2, 44, 15),),(datetime(2015, 8, 10, 3, 44, 15),)], schema)
Run Code Online (Sandbox Code Playgroud)

返回数据帧的代码:

last_processed_dt=df.select([max('event_date')])
type(last_processed_dt)
Run Code Online (Sandbox Code Playgroud)

返回变量的代码:

loop_cnt=df.select('event_date').distinct().count()
type(loop_cnt) 
Run Code Online (Sandbox Code Playgroud)

Gio*_*ous 7

使用collect()

import pyspark.sql.functions as sf


distinct_count = df.agg(sf.countDistinct('column_name')).collect()[0][0]
Run Code Online (Sandbox Code Playgroud)

使用first()

import pyspark.sql.functions as sf


distinct_count = df.agg(sf.countDistinct('column_name')).first()[0]
Run Code Online (Sandbox Code Playgroud)


小智 6

您无法直接访问数据框中的值.Dataframe返回一个Row Object.相反,Dataframe为您提供了将其转换为python字典的选项.通过以下示例,我将计算平均wordcount:

wordsDF = sqlContext.createDataFrame([('cat',), ('elephant',), ('rat',), ('rat',), ('cat', )], ['word'])
wordCountsDF = wordsDF.groupBy(wordsDF['word']).count()
wordCountsDF.show()
Run Code Online (Sandbox Code Playgroud)

以下是单词计数结果:

+--------+-----+
|    word|count|
+--------+-----+
|     cat|    2|
|     rat|    2|
|elephant|    1|
+--------+-----+
Run Code Online (Sandbox Code Playgroud)

现在我计算count列的平均值应用collect()操作.记住collect()返回一个列表.这里列表只包含一个元素.

averageCount = wordCountsDF.groupBy().avg('count').collect()
Run Code Online (Sandbox Code Playgroud)

结果看起来像这样.

[Row(avg(count)=1.6666666666666667)]
Run Code Online (Sandbox Code Playgroud)

您不能使用某个python变量直接访问平均值.您必须将其转换为字典才能访问它.

results={}
for i in averageCount:
  results.update(i.asDict())
print results
Run Code Online (Sandbox Code Playgroud)

我们的最终结果如下:

{'avg(count)': 1.6666666666666667}
Run Code Online (Sandbox Code Playgroud)

最后你可以使用以下方法访问平均值

print results['avg(count)']

1.66666666667
Run Code Online (Sandbox Code Playgroud)


Kat*_*ler 5

我很确定df.select([max('event_date')])返回一个 DataFrame,因为可能有多于一行在该列中具有最大值。在您的特定用例中,该列中没有两行可能具有相同的值,但很容易想象多行可以具有相同 max 的情况event_date

df.select('event_date').distinct().count()返回一个整数,因为它告诉您该特定列中有多少个不同的值。它不会告诉您哪个值最大。

如果您希望代码获取最大值event_date并将其存储为变量,请尝试以下操作max_date = df.select([max('event_date')]).distinct().collect()