Shu*_*tal 5 scala apache-spark spark-structured-streaming
我开始学习Spark,并且很难理解Spark中Structured Streaming背后的合理性.结构化流处理作为无界输入表到达的所有数据,其中数据流中的每个新项被视为表中的新行.我有以下代码来读入传入的文件csvFolder.
val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
val csvSchema = new StructType().add("street", "string").add("city", "string")
.add("zip", "string").add("state", "string").add("beds", "string")
.add("baths", "string").add("sq__ft", "string").add("type", "string")
.add("sale_date", "string").add("price", "string").add("latitude", "string")
.add("longitude", "string")
val streamingDF = spark.readStream.schema(csvSchema).csv("./csvFolder/")
val query = streamingDF.writeStream
.format("console")
.start()
Run Code Online (Sandbox Code Playgroud)
如果我将1GB文件转储到该文件夹会发生什么.根据规范,流媒体作业每隔几毫秒触发一次.如果Spark在下一个瞬间遇到如此庞大的文件,在尝试加载文件时不会耗尽内存.还是自动批量处理?如果是,该批处理参数是否可配置?
关键思想是将任何数据流视为无界表:添加到流中的新记录就像添加到表中的行一样.
这允许我们将批处理和流数据视为表.由于表和DataFrames/Datasets在语义上是同义的,因此可以对批处理和流数据应用相同的类似批处理的DataFrame/Dataset查询.
问题:如果Spark在下一个瞬间遇到如此庞大的文件,在尝试加载文件时不会耗尽内存.还是自动批量处理?如果是,该批处理参数是否可配置?
答: OOM没有任何意义,因为RDD(DF/DS)是懒洋洋地初始化的.当然,您需要在处理之前重新分区,以确保均匀分布在执行程序中的相同数量的分区和数据...
| 归档时间: |
|
| 查看次数: |
913 次 |
| 最近记录: |