感谢您提出这个悬而未决的问题。出于某种原因,在谈到Spark时,每个人都被分析深深吸引,以至于他们忘记了过去15年左右出现的出色软件工程实践。这就是为什么我们要在课程中讨论测试和持续集成(以及诸如DevOps之类的东西)的原因。
术语快速入门
在继续之前,我必须表达对KnolX演示文稿@himanshuIIITian引用的轻微异议。一个真正的单元测试意味着你有过在测试每个组件的完全控制。不能与数据库,REST调用,文件系统甚至系统时钟进行交互;就像Gerard Mezaros将其放入xUnit测试模式一样,所有内容都必须“加倍”(例如,被嘲笑,存根等等)。我知道这看起来像语义,但这确实很重要。未能理解这一点是您在持续集成中看到间歇性测试失败的主要原因之一。
我们仍然可以进行单元测试
因此,有了这种了解,RDD就不可能进行单元测试了。但是,在开发分析时仍然存在进行单元测试的地方。
(注意:我将在示例中使用Scala,但是概念超越了语言和框架。)
考虑一个简单的操作:
rdd.map(foo).map(bar)
Run Code Online (Sandbox Code Playgroud)
这里foo和bar是简单的功能。可以按常规方式对它们进行单元测试,并且应该在尽可能多的情况下使用它们。毕竟,他们为什么要关心从测试夹具还是测试夹具获得输入的地方RDD?
不要忘记火花壳
这本身并不是测试,但是在这些早期阶段,您还应该在Spark Shell中进行实验,以找出您的转换,尤其是方法的后果。例如,您可以检查物理和逻辑查询计划,分区策略和保存,以及您的数据中包含许多不同的功能状态toDebugString,explain,glom,show,printSchema,等。我会让你探索那些。
您还可以local[2]在Spark shell和测试中将master设置为,以识别仅在开始分发工作后才可能出现的任何问题。
Spark集成测试
现在来看看有趣的东西。
为了在对辅助函数和/ 转换逻辑的质量充满信心之后对Spark 进行集成测试,至关重要的是做一些事情(无论构建工具和测试框架如何):RDDDataFrame
SparkContext在所有测试之前初始化,在所有测试之后停止。最后一种方法有几种。一个可以从@Pushkr和@himanshuIIITian链接的KnolX演示文稿引用的火花测试库中获得。
贷款方式
另一种方法是使用贷款模式。
例如(使用ScalaTest):
class MySpec extends WordSpec with Matchers with SparkContextSetup {
"My analytics" should {
"calculate the right thing" in withSparkContext { (sparkContext) =>
val data = Seq(...)
val rdd = sparkContext.parallelize(data)
val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
total shouldBe 1000
}
}
}
trait SparkContextSetup {
def withSparkContext(testMethod: (SparkContext) => Any) {
val conf = new SparkConf()
.setMaster("local")
.setAppName("Spark test")
val sparkContext = new SparkContext(conf)
try {
testMethod(sparkContext)
}
finally sparkContext.stop()
}
}
Run Code Online (Sandbox Code Playgroud)
如您所见,“贷款模式”利用高阶函数来“贷款” SparkContext测试,然后在测试完成后将其处置。
痛苦编程(感谢Nathan)
这完全是一个优先事项,但是我更喜欢使用贷款模式并尽可能自行整理,然后再引入另一个框架。除了试图保持轻量级之外,框架有时还会添加很多“魔术”,这使得调试测试失败难以推理。因此,我采用了一种面向痛苦的编程方法-在这种情况下,我避免添加新框架,直到没有它的痛苦实在难以承受。但是,这完全取决于您。
现在,基于spark-testing-base的真正亮点之一是基于Hadoop的帮助程序,例如HDFSClusterLikeand YARNClusterLike。混合这些特征确实可以为您省去很多设置上的麻烦。另一个亮点是类似Scalacheck的属性和生成器。但同样,我个人会推迟使用它,直到我的分析和测试达到这种复杂程度为止。
Spark流的集成测试
最后,我只想展示一个带有内存值的SparkStreaming集成测试设置的摘要:
val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd
Run Code Online (Sandbox Code Playgroud)
这比看起来简单。实际上,它只是将一系列数据转换为队列以馈送到DStream。实际上,大多数只是与Spark API一起使用的样板设置。
这可能是我最长的帖子,所以我将其留在这里。我希望其他人能提出其他想法,以帮助改进所有其他应用程序开发的敏捷软件工程实践来提高我们的分析质量。
并为无耻的插件道歉,您可以查看我们的Apache Spark分析课程,我们在其中解决了很多这些想法,甚至更多。我们希望尽快有一个在线版本。
有两种测试 Spark RDD/应用程序的方法。它们如下:
例如:
测试单元:
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
class WordCount {
def get(url: String, sc: SparkContext): RDD[(String, Int)] = {
val lines = sc.textFile(url) lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
}
}
Run Code Online (Sandbox Code Playgroud)
现在测试方法1如下:
import org.scalatest.{ BeforeAndAfterAll, FunSuite }
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
class WordCountTest extends FunSuite with BeforeAndAfterAll {
private var sparkConf: SparkConf = _
private var sc: SparkContext = _
override def beforeAll() {
sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local")
sc = new SparkContext(sparkConf)
}
private val wordCount = new WordCount
test("get word count rdd") {
val result = wordCount.get("file.txt", sc)
assert(result.take(10).length === 10)
}
override def afterAll() {
sc.stop()
}
}
Run Code Online (Sandbox Code Playgroud)
在方法 1 中,我们并不是在嘲笑 RDD。我们只是检查我们班级的行为WordCount。但这里我们必须自己管理 SparkContext 的创建和销毁。因此,如果您不想为此编写额外的代码,那么您可以使用spark-testing-base,如下所示:
方法二:
import org.scalatest.FunSuite
import com.holdenkarau.spark.testing.SharedSparkContext
class WordCountTest extends FunSuite with SharedSparkContext {
private val wordCount = new WordCount
test("get word count rdd") {
val result = wordCount.get("file.txt", sc)
assert(result.take(10).length === 10)
}
}
Run Code Online (Sandbox Code Playgroud)
或者
import org.scalatest.FunSuite
import com.holdenkarau.spark.testing.SharedSparkContext
import com.holdenkarau.spark.testing.RDDComparisons
class WordCountTest extends FunSuite with SharedSparkContext with RDDComparisons {
private val wordCount = new WordCount
test("get word count rdd with comparison") {
val expected = sc.textFile("file.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
val result = wordCount.get("file.txt", sc)
assert(compareRDD(expected, result).isEmpty)
}
}
Run Code Online (Sandbox Code Playgroud)
有关 Spark RDD 测试的更多详细信息,请参阅此 - KnolX:Spark 应用程序的单元测试
| 归档时间: |
|
| 查看次数: |
2190 次 |
| 最近记录: |