Big*_*igD 2 apache-spark spark-structured-streaming spark-streaming-kafka
我想了解 Spark Structured Streaming 的单元测试方面。我的场景是,我从 Kafka 获取数据,我使用 Spark Structured Streaming 使用它并在数据之上应用一些转换。
我不确定如何使用 Scala 和 Spark 进行测试。有人可以告诉我如何使用 Scala 在结构化流中进行单元测试。我是流媒体的新手。
TL;博士使用MemoryStream
添加事件和内存散热器的输出。
以下代码应该有助于入门:
import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val events = MemoryStream[Event]
val sessions = events.toDS
assert(sessions.isStreaming, "sessions must be a streaming Dataset")
// use sessions event stream to apply required transformations
val transformedSessions = ...
val streamingQuery = transformedSessions
.writeStream
.format("memory")
.queryName(queryName)
.option("checkpointLocation", checkpointLocation)
.outputMode(queryOutputMode)
.start
// Add events to MemoryStream as if they came from Kafka
val batch = Seq(
eventGen.generate(userId = 1, offset = 1.second),
eventGen.generate(userId = 2, offset = 2.seconds))
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])
// check the output
// The output is in queryName table
// The following code simply shows the result
spark
.table(queryName)
.show(truncate = false)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2474 次 |
最近记录: |