如何使用PySpark从日常文件加载滚动窗口?

pyC*_*hon 6 csv pandas apache-spark pyspark

我有大量相当大的日常文件存储在博客存储引擎(S3,Azure datalake exc .. exc ..)中data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv.我的目标是预先形成滚动的N天线性回归,但我遇到了数据加载方面的问题.我不确定如何在没有嵌套RDD's. 的情况下执行此操作.每个.csv文件的架构都是相同的.

换句话说,对于每个日期d_t,我都需要数据x_t并加入数据(x_t-1, x_t-2,... x_t-N).

如何使用PySpark加载这些日常文件的N日窗口?我能找到的所有PySpark示例似乎都是从一个非常大的文件或数据集加载的.

这是我当前代码的一个示例:

dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]

p = sc.parallelize(dates)
def test_run(date_range):
    dt0 = date_range[-1] #get the latest date
    s = '/daily/data{}.csv'
    df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
    file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
    df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
    return 1

p.filter(test_run) 

p.map(test_run) #fails with same error as p.filter
Run Code Online (Sandbox Code Playgroud)

我在使用PySpark版本 '2.1.0'

我在Azure HDInsight集群jupyter笔记本上运行它.

spark 这是类型 <class 'pyspark.sql.session.SparkSession'>

更小,更可重复的例子如下:

p = sc.parallelize([1, 2, 3])
def foo(date_range):
    df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
    return 1
p.filter(foo).count()
Run Code Online (Sandbox Code Playgroud)

Pus*_*hkr 2

你最好使用 usingDataframes而不是RDD. Dataframe 的read.csvapi 接受路径列表,例如 -

pathList = ['/path/to/data1900-01-01.csv','/path/to/data1900-01-02.csv']
df = spark.read.csv(pathList)
Run Code Online (Sandbox Code Playgroud)

查看 read.csv 的文档

您可以通过在 N 天的窗口中执行一些日期操作来形成数据文件的日期文件路径列表"path/to/data"+datetime.today().strftime("%Y-%m-%d"))+.csv"(这只会为您提供今天的文件名,但不难计算出 N 天的日期计算)

但请记住,所有日期 csv 的架构应该相同,才能使上述工作正常进行。

编辑:当您并行化日期列表时p,即,每个日期都会由不同的执行程序单独处理,因此 test_run2 的输入实际上并不是日期列表,它是一个单独的字符串,例如1995-01-01

试试这个,看看是否有效。

# Get the list of dates 
date_range = window(dates, N) 
s = '/daily/data{}.csv'

dt0 = date_range[-1] # most recent file
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM') 

# read previous files
file_list = [s.format(dt) for dt in date_range[:-1]]
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')

r, resid = computeLinearRegression(df0,df1)
r.write.save('daily/r{}.csv'.format(dt0))
resid.write.save('/daily/resid{}.csv'.format(dt0))
Run Code Online (Sandbox Code Playgroud)