BdE*_*eer 12 apache-spark parquet spark-streaming apache-spark-sql
I have a scenario in my project , where I am reading the kafka topic messages using spark-sql-2.4.1 version. I am able to process the day using structured streaming. Once the data is received and after processed I need to save the data into respective parquet files in hdfs store.
I am able to store and read parquet files, I kept a trigger time of 15 seconds to 1 minutes. These files are very small in size hence resulting into many files.
These parquet files need to be read latter by hive queries.
So 1) Is this strategy works in production environment ? or does it lead to any small file problem later ?
2)处理/设计这种场景的最佳实践是什么,即行业标准?
3)这些事情在生产中一般是如何处理的?
谢谢你。
小智 5
我们也有类似的问题。经过大量的谷歌搜索后,似乎普遍接受的方法是编写另一个作业,经常将许多小文件聚合起来,并将它们写入其他地方的更大的合并文件中。这就是我们现在所做的。
顺便说一句:无论如何,您在这里可以做的事情是有限制的,因为您拥有的并行度越高,文件的数量就越大,因为每个执行程序线程都会写入自己的文件。他们从不写入共享文件。这似乎是并行处理的野兽的本性。
我知道这个问题太老了。我遇到了类似的问题,我使用了 Spark 结构化流查询侦听器来解决这个问题。
我的用例是从 kafka 获取数据并存储在具有年、月、日和小时分区的 hdfs 中。
下面的代码将获取前一小时的分区数据,应用重新分区并覆盖现有分区中的数据。
val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
session.streams.addListener(AppListener(config,session))
class AppListener(config: Config,spark: SparkSession) extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
this.synchronized {AppListener.mergeFiles(event.progress.timestamp,spark,config)}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}
object AppListener {
def mergeFiles(currentTs: String,spark: SparkSession,config:Config):Unit = {
val configs = config.kafka(config.key.get)
if(currentTs.datetime.isAfter(Processed.ts.plusMinutes(5))) {
println(
s"""
|Current Timestamp : ${currentTs}
|Merge Files : ${Processed.ts.minusHours(1)}
|
|""".stripMargin)
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val ts = Processed.ts.minusHours(1)
val hdfsPath = s"${configs.hdfsLocation}/year=${ts.getYear}/month=${ts.getMonthOfYear}/day=${ts.getDayOfMonth}/hour=${ts.getHourOfDay}"
val path = new Path(hdfsPath)
if(fs.exists(path)) {
val hdfsFiles = fs.listLocatedStatus(path)
.filter(lfs => lfs.isFile && !lfs.getPath.getName.contains("_SUCCESS"))
.map(_.getPath).toList
println(
s"""
|Total files in HDFS location : ${hdfsFiles.length}
| ${hdfsFiles.length > 1}
|""".stripMargin)
if(hdfsFiles.length > 1) {
println(
s"""
|Merge Small Files
|==============================================
|HDFS Path : ${hdfsPath}
|Total Available files : ${hdfsFiles.length}
|Status : Running
|
|""".stripMargin)
val df = spark.read.format(configs.writeFormat).load(hdfsPath).cache()
df.repartition(1)
.write
.format(configs.writeFormat)
.mode("overwrite")
.save(s"/tmp${hdfsPath}")
df.cache().unpersist()
spark
.read
.format(configs.writeFormat)
.load(s"/tmp${hdfsPath}")
.write
.format(configs.writeFormat)
.mode("overwrite")
.save(hdfsPath)
Processed.ts = Processed.ts.plusHours(1).toDateTime("yyyy-MM-dd'T'HH:00:00")
println(
s"""
|Merge Small Files
|==============================================
|HDFS Path : ${hdfsPath}
|Total files : ${hdfsFiles.length}
|Status : Completed
|
|""".stripMargin)
}
}
}
}
def apply(config: Config,spark: SparkSession): AppListener = new AppListener(config,spark)
}
object Processed {
var ts: DateTime = DateTime.now(DateTimeZone.forID("UTC")).toDateTime("yyyy-MM-dd'T'HH:00:00")
}
Run Code Online (Sandbox Code Playgroud)
有时数据很大,我使用以下逻辑将数据分成多个文件。文件大小约为 160 MB
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
val dataSize = bytes.toLong
val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt
df.repartition(if(numPartitions == 0) 1 else numPartitions)
.[...]
Run Code Online (Sandbox Code Playgroud)
编辑-1
使用这个 - spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes 我们可以在加载到内存后获得实际数据帧的大小,例如你可以检查下面的代码。
scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]
scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils
scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709
scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB
scala> import sys.process._
import sys.process._
scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r----- 3 svcmxns hdfs 0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r----- 3 svcmxns hdfs 727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3695 次 |
最近记录: |