最小的 Spark 会话/配置以获得最佳的单元测试性能?

Unc*_*air 7 performance unit-testing apache-spark

Spark 2.1.0 和 Scala 2.11。

我们的项目有数百个单元测试,它们执行相对简单的操作,例如创建 3 或 4 个对象的数据集并对它们执行简单的转换。其中许多测试需要长达 5-10 秒的时间才能运行,数百个测试加起来需要几分钟,并且正在成为我们 CI 构建的一个问题。操作非常简单,我想知道是否有我们可以使用的 Spark 配置来加快速度。

例如,简单地创建一个这样的数据集:

val histData = Seq(
  FooType(id = "id1", code = "code1",orgId = 1l),
  FooType(id = "id2", code = "code2",orgId = 1l),
  FooType(id = "id3", code = "code3",orgId = 1l)
).toDS()
Run Code Online (Sandbox Code Playgroud)

需要 800 毫秒(FooType 是一个案例类)。在创建 2 或 3 个这样的数据集和一些过滤器/映射/连接操作后(我真的不认为这些细节很重要,但如果你让我知道,我会发布它们),collect()需要 1000-2000 毫秒。将一些这样的操作加起来,测试可能需要 5-10 秒。

对于单元测试我们只关心测试的功能方面,我们不需要线程化、缩放、缓存、磁盘存储等。测试数据很小(通常小于1KB)并且是在内存中创建的(不从磁盘或任何外部源读取),并且对内存中的转换对象执行断言。我知道在幕后 Spark 可能会调用 DAGScheduler、代码生成器等,我想知道是否有办法在没有该功能的情况下执行作业。或者,如果确实必须这样做,请在单元测试套件开始时进行一次并在整个过程中使用它。

会话是用这样的东西创建的:

session = SparkSession.builder.config("spark.sql.shuffle.partitions","10").getOrCreate()
Run Code Online (Sandbox Code Playgroud)

并且在每个单元测试中使用相同的会话。我们直接从单元测试中调用 Spark API 方法,因此没有 Spark 提交或单独的进程或作业,它们都在 IDE 或调用单元测试的 gradle 创建的 JVM 中运行。

在我看来,这些操作每个应该花费几毫秒,我正在寻找一种方法来减少 Spark 配置,以便它以最快的方式评估内存中的所有内容。感谢您提供任何提示或想法。

Oma*_*QUI 1

根据我个人使用 Scala 的经验。

  1. 创建测试包装对象
object SparkSessionTestWrapper {
  lazy val spark: SparkSession = {
    SparkSession.builder()
      .appName("Unit Test")
      .master("local[*]")
      .getOrCreate()
  }
}
Run Code Online (Sandbox Code Playgroud)

优化性能的一个好方法是在创建 Spark 会话时禁用 UI

object SparkSessionTestWrapper {
  lazy val spark: SparkSession = {
    SparkSession.builder()
      .appName("Unit Test")
      .master("local[*]")
      .config("spark.ui.enabled", "false")  
      .getOrCreate()
  }
}
Run Code Online (Sandbox Code Playgroud)

惰性 val Spark 确保 Spark 会话仅创建一次并被使用。

  1. 然后在每个 UT 类中调用 SparkSessionTestWrapper,它将使用相同的“共享”spark 会话,因此无需为每个 UT 创建一个会话:
class UnitTest1 {
  import SparkSessionTestWrapper._

  test("Test case 1") {
    import spark.implicits._

    val histData = Seq(
      FooType(id = "id1", code = "code1",orgId = 1l),
      FooType(id = "id2", code = "code2",orgId = 1l),
      FooType(id = "id3", code = "code1",orgId = 1l)
    ).toDS()

    val result = histData.filter($"orgId " === 1l).select("code").distinct().collect()

    assert(result.length == 2)
    //other assertions ...
  }
}
Run Code Online (Sandbox Code Playgroud)