当分配 auto.offset.reset->"latest" 不会影响 spark-streaming-kafka 0-10 中的此属性时,是否有人遇到此问题
这是我的代码:
val config = StreamingConfigHelper.getStreamingConfig()
val kafkaParams = Map[String, Object]("bootstrap.servers" -> config.brokers,
"key.deserializer" -> classOf[ByteArrayDeserializer],
"value.deserializer" -> classOf[ByteArrayDeserializer],
"group.id" -> "prodgroup",
"auto.offset.reset" -> "latest",
"receive.buffer.bytes" -> (65536: java.lang.Integer),
"enable.auto.commit" -> (false: java.lang.Boolean))
val inputDStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte]](streamingContext, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[Array[Byte], Array[Byte]](config.productTopic.toArray, kafkaParams))
Run Code Online (Sandbox Code Playgroud)
但是当我部署时,我得到了这个结果
16/11/11 11:03:00 INFO ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [xxxxxxxxx:6667, xxxxxxxx:6667, xxxxxxxxxx:6667]
ssl.keystore.type = JKS …
Run Code Online (Sandbox Code Playgroud)