我有一个看起来像这样的数据框
dSc TranAmount
1: 100021 79.64
2: 100021 79.64
3: 100021 0.16
4: 100022 11.65
5: 100022 0.36
6: 100022 0.47
7: 100025 0.17
8: 100037 0.27
9: 100056 0.27
10: 100063 0.13
11: 100079 0.13
12: 100091 0.15
13: 100101 0.22
14: 100108 0.14
15: 100109 0.04
Run Code Online (Sandbox Code Playgroud)
现在我想创建一个第三列,每个列的z分数TranAmount都是
(TranAmount-mean(TranAmount))/StdDev(TranAmount)
Run Code Online (Sandbox Code Playgroud)
这里的平均值和标准偏差将基于每个dSc的组
现在我可以计算Spark SQL中的均值和标准差.
(datafromdb
.groupBy("dSc")
.agg(datafromdb.dSc, func.avg("TranAmount") ,func.stddev_pop("TranAmount")))
Run Code Online (Sandbox Code Playgroud)
但我对如何在数据框中获得z-score的第三列感到茫然.我希望任何指向正确的方法来实现这个/
我正在编写一个spark应用程序,我需要根据历史数据来评估流数据,这些数据位于sql server数据库中
现在的想法是,spark将从数据库中获取历史数据并将其保留在内存中,并将根据它评估流数据.
现在我正在获取流数据
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext,functions as func,Row
sc = SparkContext("local[2]", "realtimeApp")
ssc = StreamingContext(sc,10)
files = ssc.textFileStream("hdfs://RealTimeInputFolder/")
########Lets get the data from the db which is relavant for streaming ###
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
dataurl = "jdbc:sqlserver://myserver:1433"
db = "mydb"
table = "stream_helper"
credential = "my_credentials"
########basic data for evaluation purpose ########
files_count = files.flatMap(lambda file: file.split( ))
pattern = '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+ )(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+ )(dSc=u.)([A-Z]{2}.[0-9]+)'
tranfiles = "wasb://myserver.blob.core.windows.net/RealTimeInputFolder01/"
def getSqlContextInstance(sparkContext): …Run Code Online (Sandbox Code Playgroud) 我不确定自己是否做错了什么,请原谅我,如果我的问题可以通过以下数据重现
from pyspark.sql import Row
df = sc.parallelize([Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 5 2013 12:00AM')]).toDF()
Run Code Online (Sandbox Code Playgroud)
我创建了一个函数将此日期字符串解析为datetime对象,以进行进一步处理
from datetime import datetime
def date_convert(date_str):
date_format = '%b %d %Y %I:%M%p'
try:
dt=datetime.strptime(date_str,date_format)
except ValueError,v:
if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
dt = dt[:-(len(v.args[0])-26)]
dt=datetime.strptime(dt,date_format)
else:
raise v
return dt
Run Code Online (Sandbox Code Playgroud)
现在,如果我以此做为UDF并应用于我的数据框,则会得到意外的数据
from pyspark.sql.functions import udf
date_convert_udf = udf(date_convert)
df.select(date_convert_udf(df.C3).alias("datetime")).take(2)
Run Code Online (Sandbox Code Playgroud)
结果如下
Out[40]:
[Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]'),
Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]')]
Run Code Online (Sandbox Code Playgroud)
但是如果我在将数据帧设为RDD之后使用它,那么它将返回一个pythond datetime对象
df.rdd.map(lambda row:date_convert(row.C3)).collect()
(1) Spark Jobs
Out[42]:
[datetime.datetime(2013, 12, 1, 0, 0), …Run Code Online (Sandbox Code Playgroud) 我很确定我错过了一些非常简单的东西,但仍然无法弄清楚为什么会出现这个错误.我所拥有的数据是2013年4月至2014年3月的每个月末数据.现在我想了解12个月期间的趋势.
xx <- structure(c(41.52, 41.52, 41.52, 41.68, 41.68, 41.68, 41.84,
41.84, 41.84, 42.05, 42.05, 42.05), .Tsp = c(2013.25, 2014.16666666667,
12), class = "ts");
Run Code Online (Sandbox Code Playgroud)
是我的时间序列数据.现在我用的时候
stl(xx,s.window ="periodic")
Run Code Online (Sandbox Code Playgroud)
我收到错误:
Error in stl(xx, s.window = "periodic") :
series is not periodic or has less than two periods
Run Code Online (Sandbox Code Playgroud)
我不知道出了什么问题,因为据我所知这个系列有12个时期.请协助
Erlang新手在这里.假设我有两个这样的列表.
L1= [{'Lady in the Water',2.5},
{'Snakes on a Plane',3.5},
{'Just My Luck',3.0},
{'Superman Returns',3.5},
{'You, Me and Dupree',2.5},
{'The Night Listener',3.0}]
Run Code Online (Sandbox Code Playgroud)
和
L2 = [{'Lady in the Water',3.0},
{'Snakes on a Plane',3.5},
{'Just My Luck',1.5},
{'Superman Returns',5.0},
{'You, Me and Dupree',3.5}]
Run Code Online (Sandbox Code Playgroud)
我希望在元组列表中的常见评级
[{2.5,3.0},{3.5,3.5},{3.0,1.5},{3.5,5.0},{2.5,3.5}]
Run Code Online (Sandbox Code Playgroud)
我的代码是这样的
common_rating(R1,R2)->common_rating(R1,R2,0,0).
common_rating(_X,[],M,N) ->{M,N};
common_rating(X,[Y|Y1],A,B)->
{M,R}=X,
{N,K }= Y,
case M=/=N of
true -> common_rating(X,Y1,A,B);
false -> common_rating(X,[],A+R,B+K)
end.
common_rating_final([],_R2,J) ->J;
common_rating_final([X|X1],R2,J)->
common_rating_final(X1,R2,J++[common_rating(X,R2)]).
Run Code Online (Sandbox Code Playgroud)
为了更好地理解代码
common_rating函数,需要一个元组{movie,rating}并从另一个列表(B)中找到相同的电影和评级并返回{rating,rating_B}
现在common_rating_final递归遍历列表,让我们说A,并使用common_rating为A和B中常见的所有电影找到{rating_A,rating_B}
但是当我运行我的代码时
my_module:common_rating_final(L1,L2,[]).
Run Code Online (Sandbox Code Playgroud)
它回报了我
[{2.5,3.0},{3.5,3.5},{3.0,1.5},{3.5,5.0},{2.5,3.5},{0,0}] …Run Code Online (Sandbox Code Playgroud) apache-spark ×3
pyspark ×3
erlang ×1
forecasting ×1
list ×1
python ×1
r ×1
time-series ×1
trend ×1