vij*_*jay 4 performance apache-spark spark-streaming apache-spark-sql
我有一个火花工作,我需要在每个微批处理中编写SQL查询的输出。写入是一项非常昂贵的操作,并且会导致批处理执行时间超过批处理间隔。
我正在寻找提高写入性能的方法。
像下面显示的那样,在单独的线程中异步执行写操作是一个好选择吗?
因为Spark本身以分布式方式执行,这会引起任何副作用吗?
还有其他/更好的方法来加快写入速度吗?
// Create a fixed thread pool to execute asynchronous tasks
val executorService = Executors.newFixedThreadPool(2)
dstream.foreachRDD { rdd =>
import org.apache.spark.sql._
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
import spark.implicits._
import spark.sql
val records = rdd.toDF("record")
records.createOrReplaceTempView("records")
val result = spark.sql("select * from records")
// Submit a asynchronous task to write
executorService.submit {
new Runnable {
override def run(): Unit = {
result.write.parquet(output)
}
}
}
}
Run Code Online (Sandbox Code Playgroud)1-是否像下面所示的那样异步地在单独的线程中执行写操作,这是一个好选择吗?
否。在这里理解问题的关键是问“谁在写”。写入是通过为群集中的执行程序上的工作分配的资源完成的。将write命令放在异步线程池上就像将新的Office经理添加到具有固定人员的办公室一样。如果两个经理必须共享相同的员工,他们将能够完成比单独一个经理更多的工作吗?好吧,一个合理的答案是“仅当第一任经理没有给他们足够的工作时,才有一定的自由度”。
回到我们的集群,我们正在处理大量IO的写操作。并行写入作业将导致争用IO资源,从而使每个独立作业都更长。最初,我们的工作可能看起来比“单经理版”更好,但是麻烦最终将打击我们。我制作了一张图表,试图说明它是如何工作的。请注意,并行作业将花费更长的时间,与它们在时间轴上并发的时间成正比。
一旦达到工作开始延迟的程度,我们就会有不稳定的工作,最终将失败。
2-因为Spark本身以分布式方式执行,这会引起任何副作用吗?
我能想到的一些效果:
3-还有其他/更好的方法来加快写入速度吗? (从便宜到昂贵订购)