如何在Spark Streaming应用程序中异步写入行以加快批处理执行速度?

vij*_*jay 4 performance apache-spark spark-streaming apache-spark-sql

我有一个火花工作,我需要在每个微批处理中编写SQL查询的输出。写入是一项非常昂贵的操作,并且会导致批处理执行时间超过批处理间隔。

我正在寻找提高写入性能的方法。

  1. 像下面显示的那样,在单独的线程中异步执行写操作是一个好选择吗?

  2. 因为Spark本身以分布式方式执行,这会引起任何副作用吗?

  3. 还有其他/更好的方法来加快写入速度吗?

    // Create a fixed thread pool to execute asynchronous tasks
    val executorService = Executors.newFixedThreadPool(2)
    dstream.foreachRDD { rdd =>
      import org.apache.spark.sql._
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
      import spark.implicits._
      import spark.sql
    
      val records = rdd.toDF("record")
      records.createOrReplaceTempView("records")
      val result = spark.sql("select * from records")
    
      // Submit a asynchronous task to write
      executorService.submit {
        new Runnable {
          override def run(): Unit = {
            result.write.parquet(output)
          }
        }
      }
    }
    
    Run Code Online (Sandbox Code Playgroud)

maa*_*asg 5

1-是否像下面所示的那样异步地在单独的线程中执行写操作,这是一个好选择吗?

否。在这里理解问题的关键是问“谁在写”。写入是通过为群集中的执行程序上的工作分配的资源完成的。将write命令放在异步线程池上就像将新的Office经理添加到具有固定人员的办公室一样。如果两个经理必须共享相同的员工,他们将能够完成比单独一个经理更多的工作吗?好吧,一个合理的答案是“仅当第一任经理没有给他们足够的工作时,才有一定的自由度”。

回到我们的集群,我们正在处理大量IO的写操作。并行写入作业将导致争用IO资源,从而使每个独立作业都更长。最初,我们的工作可能看起来比“单经理版”更好,但是麻烦最终将打击我们。我制作了一张图表,试图说明它是如何工作的。请注意,并行作业将花费更长的时间,与它们在时间轴上并发的时间成正比。

Spark Streaming中的顺序作业与并行作业

一旦达到工作开始延迟的程度,我们就会有不稳定的工作,最终将失败。

2-因为Spark本身以分布式方式执行,这会引起任何副作用吗?

我能想到的一些效果:

  • 群集负载和IO竞争可能更高。
  • 作业正在线程池队列上排队,而不是在Spark Streaming队列上排队。我们延迟了通过Spark UI和监视API监视工作的能力,因为延迟是“隐藏的”,并且从Spark Streaming的角度来看一切都很好。

3-还有其他/更好的方法来加快写入速度吗? (从便宜到昂贵订购)

  • 如果要追加到拼花地板文件中,请经常创建一个新文件。随着时间的流逝,追加费用变得昂贵。
  • 增加您的批处理间隔或使用Window操作编写更大的Parquet块。实木复合地板喜欢大文件
  • 调整数据的分区和分布=>确保Spark可以并行执行写操作
  • 增加集群资源,必要时添加更多节点
  • 使用更快的存储空间