Mar*_*tin 7 testing spark-streaming
我想为消耗水槽源的火花流应用程序编写一个测试.
http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/建议使用ManualClock,但目前读取文件并验证输出对我来说已经足够了.
所以我希望使用:
JavaStreamingContext streamingContext = ...
JavaDStream<String> stream = streamingContext.textFileStream(dataDirectory);
stream.print();
streamingContext.awaitTermination();
streamingContext.start();
Run Code Online (Sandbox Code Playgroud)
不幸的是它没有打印任何东西.
我试过了:
什么都行不通.
有什么建议从文本文件中读取?
谢谢,
马丁
启动和等待的顺序确实是颠倒的.
除此之外,将数据传递到Spark Streaming应用程序进行测试的最简单方法是QueueDStream.它是任意数据的RDD的可变队列.这意味着您可以以编程方式创建数据或将其从磁盘加载到RDD并将其传递给Spark Streaming代码.
例如.为了避免fileConsumer面临的时间问题,你可以试试这个:
val rdd = sparkContext.textFile(...)
val rddQueue: Queue[RDD[String]] = Queue()
rddQueue += rdd
val dstream = streamingContext.queueStream(rddQueue)
doMyStuffWithDstream(dstream)
streamingContext.start()
streamingContext.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
我太愚蠢了,我颠倒了对 start() 和 waitTermination() 的调用
如果您想做同样的事情,您应该从 HDFS 读取,并在程序运行时添加文件。
| 归档时间: |
|
| 查看次数: |
7512 次 |
| 最近记录: |