V. *_*mma 21 scala amazon-s3 apache-spark apache-spark-sql aws-sdk
我有一个应用程序,它将数据发送到AWS Kinesis Firehose,并将数据写入我的S3存储桶.Firehose使用"yyyy/MM/dd/HH"格式来编写文件.
就像在这个示例S3路径中一样:
s3://mybucket/2016/07/29/12
Run Code Online (Sandbox Code Playgroud)
现在我有一个用Scala编写的Spark应用程序,我需要从特定时间段读取数据.我有开始和结束日期.数据采用JSON格式,这就是我sqlContext.read.json()不使用的原因sc.textFile().
如何快速有效地读取数据?
通配符 - 我可以选择特定日期或特定月份所有日期的所有小时数据,例如:
val df = sqlContext.read.json("s3://mybucket/2016/07/29/*")
val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")
Run Code Online (Sandbox Code Playgroud)
但是,如果我必须从几天的日期读取数据,例如2016-07-29 - 2016-07-30我不能以相同的方式使用通配符方法.
这让我想到了下一点......
sc.textFile()sqlContext.read.json()联盟 - 云的前一个链接的第二个解决方案建议分别读取每个目录,然后将它们合并在一起.虽然他建议联合RDD-s,但也可以选择联合DataFrames.如果我手动生成给定日期时间段的日期字符串,那么我可能会创建一个不存在的路径,而不是忽略它,整个读取失败.相反,我可以使用AWS SDK并使用listObjectsAmazonS3Client中的函数来获取上一个链接中iMKanchwala解决方案中的所有密钥.
唯一的问题是我的数据不断变化.如果read.json()函数将所有数据作为单个参数获取,它将读取所有必需的数据,并且足够智能从数据中推断出json模式.如果我分别读取2个目录并且它们的模式不匹配,那么我认为联合这两个数据帧会成为一个问题.
Glob(?)语法 - nhahtdh的这个解决方案比选项1和2好一点,因为它们提供了更详细地指定日期和目录的选项,并作为单个"路径",因此它也适用于.read.json()
但同样,关于丢失的目录会出现一个熟悉的问题.假设我想要从20.07到30.07的所有数据,我可以这样声明:
val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")
Run Code Online (Sandbox Code Playgroud)
但是,如果我从7月25日开始丢失数据,那么路径..16/07/25/就不存在了,整个功能都失败了.
显然,当请求的时间段是25.11.2015-12.02.2016时,它会变得更加困难,那么我需要以编程方式(在我的Scala脚本中)创建一个类似这样的字符串路径:
"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"
Run Code Online (Sandbox Code Playgroud)
通过创建它,我会以某种方式确定这些25-30和01-12间隔都有相应的路径,如果缺少一个,它会再次失败.(幸运的是,Asterisk会处理丢失的目录,因为它会读取存在的所有内容)
如何从一个目录路径中一次性读取所有必要的数据,而不会因为某个日期间隔之间缺少目录而失败?
Sim*_*Sim 13
有一个更简单的解决方案.如果您查看DataFrameReader API,您会注意到有一种.json(paths: String*)方法.只需构建一个你想要的路径的集合,然后根据你的喜好,然后调用方法,例如,
val paths: Seq[String] = ...
val df = sqlContext.read.json(paths: _*)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
13568 次 |
| 最近记录: |