esh*_*lev 0 testing apache-spark dstream
我正在围绕Apache Spark Streaming编写一些自包含的集成测试.我想测试我的代码可以在我的模拟测试数据中摄取各种边缘情况.当我使用常规RDD(不是流式传输)时这样做.我可以使用我的内联数据并在其上调用"parallelize"将其转换为spark RDD.但是,我找不到这种创建目的地的方法.理想情况下,我想偶尔调用一些"推"功能,让魔法神奇地出现在我的dstream中.ATM我正在使用Apache Kafka这样做:我创建了一个临时队列,然后我写信给它.但这似乎有点矫枉过正.我宁愿直接从我的测试数据创建测试dstream,而不必使用Kafka作为调解器.
出于测试目的,您可以从RDD队列创建输入流.在队列中推送更多RDD将模拟在批处理间隔中处理了更多事件.
val sc = SparkContextHolder.sc
val ssc = new StreamingContext(sc, Seconds(1))
val inputData: mutable.Queue[RDD[Int]] = mutable.Queue()
val inputStream: InputDStream[Int] = ssc.queueStream(inputData)
inputData += sc.makeRDD(List(1, 2)) // Emulate the RDD created during the first batch interval
inputData += sc.makeRDD(List(3, 4)) // 2nd batch interval
// etc
val result = inputStream.map(x => x*x)
result.foreachRDD(rdd => assertSomething(rdd))
ssc.start() // Don't forget to start the streaming context
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2127 次 |
| 最近记录: |