Mar*_*kus 7 python apache-spark apache-spark-sql pyspark
我调用的PySpark DataFrame(不是pandas)df非常大collect().因此,下面给出的代码效率不高.它使用的是少量数据,但现在却失败了.
import numpy as np
myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)
有没有办法通过使用或类似获得mean和std作为两个变量pyspark.sql.functions?
from pyspark.sql.functions import mean as mean_, std as std_
withColumn但是,我可以使用这种方法逐行应用计算,并且它不返回单个变量.
更新:
样本内容df:
+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|
我应计算的平均值和标准偏差score值,例如值1中[691,1]的分数之一.
pau*_*ult 18
您可以使用内置函数来获取聚合统计信息.以下是获得均值和标准差的方法.
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
df_stats = df.select(
    _mean(col('columnName')).alias('mean'),
    _stddev(col('columnName')).alias('std')
).collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
请注意,有三种不同的标准偏差函数.从文档中我使用的(stddev)返回以下内容:
聚合函数:返回组中表达式的无偏样本标准差
你也可以使用这个describe()方法:
df.describe().show()
有关更多信息,请参阅此链接:pyspark.sql.functions
更新:这是您可以处理嵌套数据的方法.
用于explode将值提取到单独的行中,然后调用mean并stddev如上所示.
这是一个MWE:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev
# mock up sample dataframe
df = sqlCtx.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)
# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())
# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
    .withColumn('score', get_score(col('exploded')))\
    .select(
        _mean(col('score')).alias('mean'),
        _stddev(col('score')).alias('std')
    )\
    .collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
print([mean, std])
哪个输出:
[2.3333333333333335, 1.505545305418162]
您可以使用以下方法验证这些值是否正确numpy:
vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])
说明:您的"products"列是一个list的list秒.调用explode将为外部的每个元素创建一个新行list.然后"score"从每个爆炸行中获取值,这些行已定义为2元素中的第二个元素list.最后,在这个新列上调用聚合函数.
| 归档时间: | 
 | 
| 查看次数: | 25445 次 | 
| 最近记录: |