pyC*_*hon 6 csv pandas apache-spark pyspark
我有大量相当大的日常文件存储在博客存储引擎(S3,Azure datalake exc .. exc ..)中data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv.我的目标是预先形成滚动的N天线性回归,但我遇到了数据加载方面的问题.我不确定如何在没有嵌套RDD's.
的情况下执行此操作.每个.csv文件的架构都是相同的.
换句话说,对于每个日期d_t,我都需要数据x_t并加入数据(x_t-1, x_t-2,... x_t-N).
如何使用PySpark加载这些日常文件的N日窗口?我能找到的所有PySpark示例似乎都是从一个非常大的文件或数据集加载的.
这是我当前代码的一个示例:
dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]
p = sc.parallelize(dates)
def test_run(date_range):
dt0 = date_range[-1] #get the latest date
s = '/daily/data{}.csv'
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
return 1
p.filter(test_run)
p.map(test_run) #fails with same error as p.filter
Run Code Online (Sandbox Code Playgroud)
我在使用PySpark版本 '2.1.0'
我在Azure HDInsight集群jupyter笔记本上运行它.
spark 这是类型 <class 'pyspark.sql.session.SparkSession'>
更小,更可重复的例子如下:
p = sc.parallelize([1, 2, 3])
def foo(date_range):
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
return 1
p.filter(foo).count()
Run Code Online (Sandbox Code Playgroud)
你最好使用 usingDataframes而不是RDD. Dataframe 的read.csvapi 接受路径列表,例如 -
pathList = ['/path/to/data1900-01-01.csv','/path/to/data1900-01-02.csv']
df = spark.read.csv(pathList)
Run Code Online (Sandbox Code Playgroud)
您可以通过在 N 天的窗口中执行一些日期操作来形成数据文件的日期文件路径列表"path/to/data"+datetime.today().strftime("%Y-%m-%d"))+.csv"(这只会为您提供今天的文件名,但不难计算出 N 天的日期计算)
但请记住,所有日期 csv 的架构应该相同,才能使上述工作正常进行。
编辑:当您并行化日期列表时p,即,每个日期都会由不同的执行程序单独处理,因此 test_run2 的输入实际上并不是日期列表,它是一个单独的字符串,例如1995-01-01
试试这个,看看是否有效。
# Get the list of dates
date_range = window(dates, N)
s = '/daily/data{}.csv'
dt0 = date_range[-1] # most recent file
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
# read previous files
file_list = [s.format(dt) for dt in date_range[:-1]]
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
r, resid = computeLinearRegression(df0,df1)
r.write.save('daily/r{}.csv'.format(dt0))
resid.write.save('/daily/resid{}.csv'.format(dt0))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
401 次 |
| 最近记录: |