Spark - 使用Firehose从分区文件夹中读取JSON

Kur*_*ile 8 apache-spark apache-spark-sql databricks spark-structured-streaming

Kinesis firehose将文件的持久性(在本例中为时间序列JSON)管理到由YYYY/MM/DD/HH划分的文件夹层次结构(直到24编号中的小时)......很棒.

如何使用Spark 2.0然后我可以读取这些嵌套的子文件夹并从所有叶子json文件创建一个静态Dataframe?数据框阅读器是否有"选项"?

我的下一个目标是将其作为流式DF,其中Firehose持久保存到s3中的新文件自然会成为使用Spark 2.0中新结构化流媒体的流数据帧的一部分.我知道这都是实验性的 - 希望有人之前使用S3作为流媒体文件源,其中数据被分成如上所述的文件夹.当然更喜欢直接使用Kinesis流,但是这个连接器上没有2.0的日期,所以Firehose-> S3是临时的.

ND:我正在使用databricks,它将S3安装到DBFS中,但当然可以很容易地成为EMR或其他Spark提供商.很高兴看到一个笔记本电脑,如果一个人可以分享给出一个例子.

干杯!

mrs*_*vas 6

我可以读取嵌套的子文件夹并从所有叶子JSON文件创建静态DataFrame吗?DataFrame阅读器有选项吗?

是的,因为您的目录结构是常规(YYYY/MM/DD/HH),您可以使用下面的通配符字符给路径节点直到叶节点

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate

val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json 
Run Code Online (Sandbox Code Playgroud)

当然,更喜欢直接使用Kinesis流,但是这个连接器上没有2.0的日期,因此Firehose-> S3是临时的.

我可以看到Kinesis与Spark Streaming集成的库.因此,您可以直接读取流数据并在其上执行SQL操作,而无需从S3读取.

groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0
Run Code Online (Sandbox Code Playgroud)

Spark Streaming和SQL的示例代码

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream(
 streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
 [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)

kinesisStream.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")

  // Create a temporary view with DataFrame
  jsonDf.createOrReplaceTempView("json_data_tbl")

  //As we have DataFrame and SparkSession object we can perform most 
  //of the Spark SQL stuff here
}
Run Code Online (Sandbox Code Playgroud)