我有大量相当大的日常文件存储在博客存储引擎(S3,Azure datalake exc .. exc ..)中data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv.我的目标是预先形成滚动的N天线性回归,但我遇到了数据加载方面的问题.我不确定如何在没有嵌套RDD's.
的情况下执行此操作.每个.csv文件的架构都是相同的.
换句话说,对于每个日期d_t,我都需要数据x_t并加入数据(x_t-1, x_t-2,... x_t-N).
如何使用PySpark加载这些日常文件的N日窗口?我能找到的所有PySpark示例似乎都是从一个非常大的文件或数据集加载的.
这是我当前代码的一个示例:
dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]
p = sc.parallelize(dates)
def test_run(date_range):
dt0 = date_range[-1] #get the latest date
s = '/daily/data{}.csv'
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
return 1
p.filter(test_run)
p.map(test_run) #fails with same error as …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 Spark 中实现 K-最近邻算法。我想知道是否可以使用嵌套的 RDD。这会让我的生活轻松很多。考虑以下代码片段。
public static void main (String[] args){
//blah blah code
JavaRDD<Double> temp1 = testData.map(
new Function<Vector,Double>(){
public Double call(final Vector z) throws Exception{
JavaRDD<Double> temp2 = trainData.map(
new Function<Vector, Double>() {
public Double call(Vector vector) throws Exception {
return (double) vector.length();
}
}
);
return (double)z.length();
}
}
);
}
Run Code Online (Sandbox Code Playgroud)
目前我遇到了这个嵌套设置的错误(我可以在这里发布完整的日志)。是否允许在拳头位置?谢谢