相关疑难解决方法(0)

如何使用PySpark从日常文件加载滚动窗口?

我有大量相当大的日常文件存储在博客存储引擎(S3,Azure datalake exc .. exc ..)中data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv.我的目标是预先形成滚动的N天线性回归,但我遇到了数据加载方面的问题.我不确定如何在没有嵌套RDD's. 的情况下执行此操作.每个.csv文件的架构都是相同的.

换句话说,对于每个日期d_t,我都需要数据x_t并加入数据(x_t-1, x_t-2,... x_t-N).

如何使用PySpark加载这些日常文件的N日窗口?我能找到的所有PySpark示例似乎都是从一个非常大的文件或数据集加载的.

这是我当前代码的一个示例:

dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]

p = sc.parallelize(dates)
def test_run(date_range):
    dt0 = date_range[-1] #get the latest date
    s = '/daily/data{}.csv'
    df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
    file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
    df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
    return 1

p.filter(test_run) 

p.map(test_run) #fails with same error as …
Run Code Online (Sandbox Code Playgroud)

csv pandas apache-spark pyspark

6
推荐指数
1
解决办法
401
查看次数

是否可以在 Apache Spark 中创建嵌套的 RDD?

我正在尝试在 Spark 中实现 K-最近邻算法。我想知道是否可以使用嵌套的 RDD。这会让我的生活轻松很多。考虑以下代码片段。

public static void main (String[] args){
//blah blah code
JavaRDD<Double> temp1 = testData.map(
    new Function<Vector,Double>(){
        public Double call(final Vector z) throws Exception{
            JavaRDD<Double> temp2 = trainData.map(
                    new Function<Vector, Double>() {
                        public Double call(Vector vector) throws Exception {
                            return (double) vector.length();
                        }
                    }
            );
            return (double)z.length();
        }    
    }
);
}
Run Code Online (Sandbox Code Playgroud)

目前我遇到了这个嵌套设置的错误(我可以在这里发布完整的日志)。是否允许在拳头位置?谢谢

java nested apache-spark rdd

5
推荐指数
1
解决办法
3566
查看次数

标签 统计

apache-spark ×2

csv ×1

java ×1

nested ×1

pandas ×1

pyspark ×1

rdd ×1