如何对 Spark Structured Streaming 执行单元测试?

Big*_*igD 2 apache-spark spark-structured-streaming spark-streaming-kafka

我想了解 Spark Structured Streaming 的单元测试方面。我的场景是,我从 Kafka 获取数据,我使用 Spark Structured Streaming 使用它并在数据之上应用一些转换。

我不确定如何使用 Scala 和 Spark 进行测试。有人可以告诉我如何使用 Scala 在结构化流中进行单元测试。我是流媒体的新手。

Jac*_*ski 9

TL;博士使用MemoryStream添加事件和内存散热器的输出。

以下代码应该有助于入门:

import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val events = MemoryStream[Event]
val sessions = events.toDS
assert(sessions.isStreaming, "sessions must be a streaming Dataset")

// use sessions event stream to apply required transformations
val transformedSessions = ...

val streamingQuery = transformedSessions
  .writeStream
  .format("memory")
  .queryName(queryName)
  .option("checkpointLocation", checkpointLocation)
  .outputMode(queryOutputMode)
  .start

// Add events to MemoryStream as if they came from Kafka
val batch = Seq(
  eventGen.generate(userId = 1, offset = 1.second),
  eventGen.generate(userId = 2, offset = 2.seconds))
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])

// check the output
// The output is in queryName table
// The following code simply shows the result
spark
  .table(queryName)
  .show(truncate = false)
Run Code Online (Sandbox Code Playgroud)