小编Bg1*_*850的帖子

如何在Spark SQL中为每个组创建z-score

我有一个看起来像这样的数据框

        dSc     TranAmount
 1: 100021      79.64
 2: 100021      79.64
 3: 100021       0.16
 4: 100022      11.65
 5: 100022       0.36
 6: 100022       0.47
 7: 100025       0.17
 8: 100037       0.27
 9: 100056       0.27
10: 100063       0.13
11: 100079       0.13
12: 100091       0.15
13: 100101       0.22
14: 100108       0.14
15: 100109       0.04
Run Code Online (Sandbox Code Playgroud)

现在我想创建一个第三列,每个列的z分数TranAmount都是

(TranAmount-mean(TranAmount))/StdDev(TranAmount)
Run Code Online (Sandbox Code Playgroud)

这里的平均值和标准偏差将基于每个dSc的组

现在我可以计算Spark SQL中的均值和标准差.

(datafromdb
  .groupBy("dSc")
  .agg(datafromdb.dSc, func.avg("TranAmount") ,func.stddev_pop("TranAmount")))
Run Code Online (Sandbox Code Playgroud)

但我对如何在数据框中获得z-score的第三列感到茫然.我希望任何指向正确的方法来实现这个/

python apache-spark apache-spark-sql pyspark

6
推荐指数
1
解决办法
3743
查看次数

如何使用火花数据框评估火花Dstream对象

我正在编写一个spark应用程序,我需要根据历史数据来评估流数据,这些数据位于sql server数据库中

现在的想法是,spark将从数据库中获取历史数据并将其保留在内存中,并将根据它评估流数据.

现在我正在获取流数据

import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext,functions as func,Row


sc = SparkContext("local[2]", "realtimeApp")
ssc = StreamingContext(sc,10)
files = ssc.textFileStream("hdfs://RealTimeInputFolder/")

########Lets get the data from the db which is relavant for streaming ###

driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
dataurl = "jdbc:sqlserver://myserver:1433"
db = "mydb"
table = "stream_helper"
credential = "my_credentials"

########basic data for evaluation purpose ########



files_count = files.flatMap(lambda file: file.split( ))

pattern =  '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+ )(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+ )(dSc=u.)([A-Z]{2}.[0-9]+)'


tranfiles = "wasb://myserver.blob.core.windows.net/RealTimeInputFolder01/"

def getSqlContextInstance(sparkContext): …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming pyspark

5
推荐指数
1
解决办法
606
查看次数

为什么python UDF返回意外的datetime对象,而在RDD上应用的同一函数给出正确的datetime对象

我不确定自己是否做错了什么,请原谅我,如果我的问题可以通过以下数据重现

from pyspark.sql import Row
df = sc.parallelize([Row(C3=u'Dec  1 2013 12:00AM'),
 Row(C3=u'Dec  1 2013 12:00AM'),
 Row(C3=u'Dec  5 2013 12:00AM')]).toDF()
Run Code Online (Sandbox Code Playgroud)

我创建了一个函数将此日期字符串解析为datetime对象,以进行进一步处理

from datetime import datetime
def date_convert(date_str):
   date_format = '%b %d %Y %I:%M%p'
   try:
    dt=datetime.strptime(date_str,date_format)
   except ValueError,v:
    if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
      dt = dt[:-(len(v.args[0])-26)]
      dt=datetime.strptime(dt,date_format)
    else:
      raise v
   return dt
Run Code Online (Sandbox Code Playgroud)

现在,如果我以此做为UDF并应用于我的数据框,则会得到意外的数据

from pyspark.sql.functions import udf
date_convert_udf = udf(date_convert)
df.select(date_convert_udf(df.C3).alias("datetime")).take(2)
Run Code Online (Sandbox Code Playgroud)

结果如下

Out[40]: 
[Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]'),
 Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]')]
Run Code Online (Sandbox Code Playgroud)

但是如果我在将数据帧设为RDD之后使用它,那么它将返回一个pythond datetime对象

df.rdd.map(lambda row:date_convert(row.C3)).collect()
(1) Spark Jobs
Out[42]: 
[datetime.datetime(2013, 12, 1, 0, 0), …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-dataframe

3
推荐指数
1
解决办法
1927
查看次数

stl .series中的错误不是周期性的

我很确定我错过了一些非常简单的东西,但仍然无法弄清楚为什么会出现这个错误.我所拥有的数据是2013年4月至2014年3月的每个月末数据.现在我想了解12个月期间的趋势.

xx <- structure(c(41.52, 41.52, 41.52, 41.68, 41.68, 41.68, 41.84, 
41.84, 41.84, 42.05, 42.05, 42.05), .Tsp = c(2013.25, 2014.16666666667, 
12), class = "ts");
Run Code Online (Sandbox Code Playgroud)

是我的时间序列数据.现在我用的时候

 stl(xx,s.window ="periodic")
Run Code Online (Sandbox Code Playgroud)

我收到错误:

Error in stl(xx, s.window = "periodic") : 
  series is not periodic or has less than two periods
Run Code Online (Sandbox Code Playgroud)

我不知道出了什么问题,因为据我所知这个系列有12个时期.请协助

r time-series forecasting trend

2
推荐指数
1
解决办法
3995
查看次数

Erlang - 两个列表中的常用项目

Erlang新手在这里.假设我有两个这样的列表.

   L1= [{'Lady in the Water',2.5},
        {'Snakes on a Plane',3.5},
        {'Just My Luck',3.0},
        {'Superman Returns',3.5},
        {'You, Me and Dupree',2.5},
        {'The Night Listener',3.0}]
Run Code Online (Sandbox Code Playgroud)

L2 = [{'Lady in the Water',3.0},
      {'Snakes on a Plane',3.5},
      {'Just My Luck',1.5},
      {'Superman Returns',5.0},
      {'You, Me and Dupree',3.5}]
Run Code Online (Sandbox Code Playgroud)

我希望在元组列表中的常见评级

[{2.5,3.0},{3.5,3.5},{3.0,1.5},{3.5,5.0},{2.5,3.5}]
Run Code Online (Sandbox Code Playgroud)

我的代码是这样的

common_rating(R1,R2)->common_rating(R1,R2,0,0).

common_rating(_X,[],M,N)  ->{M,N};



common_rating(X,[Y|Y1],A,B)->
  {M,R}=X,
  {N,K }= Y,
  case M=/=N of
    true -> common_rating(X,Y1,A,B);
    false -> common_rating(X,[],A+R,B+K)
  end.

common_rating_final([],_R2,J) ->J;
common_rating_final([X|X1],R2,J)->
  common_rating_final(X1,R2,J++[common_rating(X,R2)]).
Run Code Online (Sandbox Code Playgroud)

为了更好地理解代码 common_rating函数,需要一个元组{movie,rating}并从另一个列表(B)中找到相同的电影和评级并返回{rating,rating_B}

现在common_rating_final递归遍历列表,让我们说A,并使用common_rating为A和B中常见的所有电影找到{rating_A,rating_B}

但是当我运行我的代码时

my_module:common_rating_final(L1,L2,[]).
Run Code Online (Sandbox Code Playgroud)

它回报了我

[{2.5,3.0},{3.5,3.5},{3.0,1.5},{3.5,5.0},{2.5,3.5},{0,0}] …
Run Code Online (Sandbox Code Playgroud)

erlang list

1
推荐指数
1
解决办法
614
查看次数