use*_*537 5 apache-spark amazon-kinesis spark-streaming
我试图用spark处理kinesis流数据.我用2个分片开始我的流.它工作正常,直到我手动拆分我的一个分片.之后,我的程序在新数据到达时崩溃.
这是错误消息:
ERROR ShutdownTask: Application exception.
java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000001
Run Code Online (Sandbox Code Playgroud)
我该如何在程序中处理已关闭分片的检查点?
编辑:添加示例代码到这个问题(这也与我高度相关).
示例代码:
/* Create a streaming context */
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(2) )
ssc.checkpoint(checkpointDirectory)
val kinesisStreams = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl,awsRegion,InitialPositionInStream.LATEST,Seconds(1),StorageLevel.MEMORY_ONLY)
/* Do the processing */
kinesisStreams.foreachRDD(rdd => ...)
ssc
}
/* First, recover the context; otherwise, create a new context */
val ssc = StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _ )
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
385 次 |
| 最近记录: |