小编Gpw*_*ner的帖子

不清楚Kafka中auto.offset.reset和enable.auto.commit的含义

我是Kafka的新手,我不太了解Kafka配置的含义,任何人都可以解释为什么更容易理解!

这是我的代码:

 val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "master:9092,slave1:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "GROUP_2017",
  "auto.offset.reset" -> "latest", //earliest or latest
  "enable.auto.commit" -> (true: java.lang.Boolean)
)
Run Code Online (Sandbox Code Playgroud)

这在我的代码中意味着什么?

apache-kafka kafka-consumer-api

10
推荐指数
3
解决办法
2万
查看次数

Spark Streaming + Kafka集成时如何一次实现完全一次

我有一个问题,有一个指南介绍如何实现完全一致的代码,这是代码:https : //spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))
 //=====================================================
 //separate line
 //=====================================================
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Run Code Online (Sandbox Code Playgroud)

但是,如果我想使用'reduceByKeyAndWindow'int分隔行,就像这样:

 val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)


val lines: DStream[String] = stream.map(record => record.value)
lines.map(row => {
  (row.split(",")(1), 1)
}).reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(30), Seconds(5))
  .foreachRDD(rdd => {
    val offsetRanges …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming

5
推荐指数
0
解决办法
580
查看次数