Jas*_*ald 2 python standard-deviation apache-spark rdd pyspark
我有一个RDD,我想找到standard deviation数据中的一个列RDD.我目前的代码是:
def extract(line):
# line[11] is the column in which I want to find standard deviation
return (line[1],line[2],line[5],line[6],line[8],line[10],line[11])
inputfile1 = sc.textFile('file1.csv').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)
data = (inputfile1
.map(lambda line: line.split(";"))
.filter(lambda line: len(line) >1 )
.map(extract)) # Map to tuples
Run Code Online (Sandbox Code Playgroud)
data是RDD在我的最后一列(列6)具有其中我想找到的值standard deviation.我该怎么找到它?
更新:我目前的代码:
def extract(line):
# last column is numeric but in string format
return ((float(line[-1])))
input = sc.textFile('file1.csv').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)
Data = (input
.map(lambda line: line.split(";"))
.filter(lambda line: len(line) >1 )
.map(extract)) # Map to tuples
row = Row("val")
df = Data.map(row).toDF()
df.map(lambda r: r.x).stdev()
Run Code Online (Sandbox Code Playgroud)
当我运行这个时,我得到错误:AttributeError: xat df.map(lambda r: r.x).stdev().注意:我的数据中的某些值为负数
在Spark <1.6.0中,您有几个选择:
转换为RDD并使用stdev方法:
from pyspark.sql import Row
import numpy as np
row = Row("x")
df = sc.parallelize([row(float(x)) for x in np.random.randn(100)]).toDF()
df.rdd.map(lambda r: r.x).stdev()
Run Code Online (Sandbox Code Playgroud)使用以下公式(这里是 Scala版本):
from pyspark.sql.functions import avg, pow, col, sqrt, lit
sd = sqrt(
avg(col("x") * col("x")) - pow(avg(col("x")), lit(2))).alias("stdev")
df.select(sd)
Run Code Online (Sandbox Code Playgroud)Hive UDF:
df.registerTempTable("df")
sqlContext.sql("SELECT stddev(x) AS sd FROM df")
Run Code Online (Sandbox Code Playgroud)星火1.6.0介绍stddev,stddev_samp和stddev_pop功能.
| 归档时间: |
|
| 查看次数: |
5061 次 |
| 最近记录: |