如何使用 scala 模拟 Spark DataFrameReader?

dyt*_*iak 5 unit-testing scala mocking apache-spark

我想对使用 .RDBMS 读取 DataFrame 的代码进行单元测试sparkSession.read.jdbc(...)。但我没有找到如何模拟 DataFrameReader 以返回虚拟 DataFrame 进行测试的方法。

代码示例:

object ConfigurationLoader {

def readTable(tableName: String)(implicit spark: SparkSession): DataFrame = {
    spark.read
      .format("jdbc")
      .option("url", s"$postgresUrl/$postgresDatabase")
      .option("dbtable", tableName)
      .option("user", postgresUsername)
      .option("password", postgresPassword)
      .option("driver", postgresDriver)
      .load()
  }

def loadUsingFilter(dummyFilter: String*)(implicit spark: SparkSession): DataFrame = {
    readTable(postgresFilesTableName)
      .where(col("column").isin(fileTypes: _*))
  }
}
Run Code Online (Sandbox Code Playgroud)

第二个问题 - 要模拟 scala 对象,看起来我需要使用其他方法来创建此类服务。

Oli*_*Oli 5

在我看来,单元测试并不是为了测试数据库连接。这应该在集成测试中完成,以检查所有部分是否协同工作。单元测试只是为了测试你的功能逻辑,而不是 Spark 从数据库读取的能力。

这就是为什么我会稍微不同地设计你的代码并这样做,而不关心数据库。

/** This, I don't test. I trust spark.read */
def readTable(tableName: String)(implicit spark: SparkSession): DataFrame = {
    spark.read
    .option(...)
    ...
    .load()
    // Nothing more
}

/** This I test, this is my logic. */
def transform(df : DataFrame, dummyFilter: String*): DataFrame = {
    df
      .where(col("column").isin(fileTypes: _*))
}
Run Code Online (Sandbox Code Playgroud)

然后我在生产中以这种方式使用代码。

val source = readTable("...")
val result = transform(source, filter)
Run Code Online (Sandbox Code Playgroud)

现在transform,包含我的逻辑,很容易测试。如果您想知道如何创建虚拟数据框,我喜欢的一种方法是:

val df = Seq((1, Some("a"), true), (2, Some("b"), false), 
      (3, None, true)).toDF("x", "y", "z")
// and the test
val result = transform(df, filter)
result should be ...
Run Code Online (Sandbox Code Playgroud)