Gpw*_*ner 5 apache-spark spark-streaming
我有一个问题,有一个指南介绍如何实现完全一致的代码,这是代码:https : //spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
//=====================================================
//separate line
//=====================================================
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Run Code Online (Sandbox Code Playgroud)
但是,如果我想使用'reduceByKeyAndWindow'int分隔行,就像这样:
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val lines: DStream[String] = stream.map(record => record.value)
lines.map(row => {
(row.split(",")(1), 1)
}).reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(30), Seconds(5))
.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//mycode start
rdd.foreach(println)
//mycaode end
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
Run Code Online (Sandbox Code Playgroud)
我正在尝试此操作,但出现错误:
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges
Run Code Online (Sandbox Code Playgroud)
有任何帮助吗?
| 归档时间: |
|
| 查看次数: |
580 次 |
| 最近记录: |