Spark Streaming + Kafka集成时如何一次实现完全一次

Gpw*_*ner 5 apache-spark spark-streaming

我有一个问题,有一个指南介绍如何实现完全一致的代码,这是代码:https : //spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))
 //=====================================================
 //separate line
 //=====================================================
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Run Code Online (Sandbox Code Playgroud)

但是,如果我想使用'reduceByKeyAndWindow'int分隔行,就像这样:

 val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)


val lines: DStream[String] = stream.map(record => record.value)
lines.map(row => {
  (row.split(",")(1), 1)
}).reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(30), Seconds(5))
  .foreachRDD(rdd => {
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //mycode start
    rdd.foreach(println)
    //mycaode end
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  })
Run Code Online (Sandbox Code Playgroud)

我正在尝试此操作,但出现错误:

Exception in thread "main" java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges
Run Code Online (Sandbox Code Playgroud)

有任何帮助吗?