Gpw*_*ner 10 apache-kafka kafka-consumer-api
我是Kafka的新手,我不太了解Kafka配置的含义,任何人都可以解释为什么更容易理解!
这是我的代码:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:9092,slave1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "GROUP_2017",
"auto.offset.reset" -> "latest", //earliest or latest
"enable.auto.commit" -> (true: java.lang.Boolean)
)
Run Code Online (Sandbox Code Playgroud)
这在我的代码中意味着什么?
dbu*_*osp 10
我将向您解释其含义,但我强烈建议您阅读Kafka网站配置
"bootstrap.servers" -> "master:9092,slave1:9092"
Run Code Online (Sandbox Code Playgroud)
基本上是Kafka集群配置:IP和端口.
"key.deserializer" -> classOf[StringDeserializer]
"value.deserializer" -> classOf[StringDeserializer]
Run Code Online (Sandbox Code Playgroud)
这个答案解释了目的是什么.
"group.id" -> "GROUP_2017"
Run Code Online (Sandbox Code Playgroud)
消费者进程将属于groupId.groupId可以有多个使用者,而Kafka只会将一个使用者进程只分配给一个分区(用于数据消费).如果消费者数量大于可用分区,则某些进程将处于空闲状态.
"enable.auto.commit" -> (true: java.lang.Boolean)
Run Code Online (Sandbox Code Playgroud)
如果该标志为真,那么Kafka能够使用Zookeeper提交您从Kafka带来的消息,以保留它读取的最后一个"偏移量".当您需要生产系统的更强大的解决方案时,这种方法不是最好的方法,因为不能确保您带来的记录被正确处理(使用您在代码中编写的逻辑).如果此标志为false,则Kafka将不知道哪个是最后一个偏移读取,因此当您重新启动该过程时,它将开始读取"最早"或"最新"偏移量,具体取决于您的下一个标志的值(auto.offset.重启).最后,这篇Cloudera文章详细解释了如何以适当的方式管理抵消.
"auto.offset.reset" -> "latest"
Run Code Online (Sandbox Code Playgroud)
这个标志告诉Kafka在没有任何"提交"的情况下从哪里开始读取偏移量.换句话说,如果你还没有在Zookeeper中保留任何偏移量(手动或使用enable.auto.commit标志),它将从'最早'或从'最新'开始.
auto.offset.reset仅当没有有效的提交偏移量时才起作用;例如在第一次启动系统时,或者在提交的偏移量到期并因为太旧而被删除之后。
enable.auto.commit是关于在后台自动提交偏移量与在前台显式手动控制的选择。
auto.offset.reset
Run Code Online (Sandbox Code Playgroud)
当 Kafka 中没有初始偏移量或者当前偏移量在服务器上不再存在时(例如因为该数据已被删除)该怎么办:
earliest:自动将偏移量重置为最早的偏移量latest:自动将偏移量重置为最新偏移量none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常Type:
string
Default:
latest
Valid Values:
[latest, earliest, none]
Importance:
medium
Run Code Online (Sandbox Code Playgroud)
enable.auto.commit
Run Code Online (Sandbox Code Playgroud)
如果为 true,消费者的偏移量将在后台定期提交。
Type:
boolean
Default:
true
Valid Values:
Importance:
medium
Run Code Online (Sandbox Code Playgroud)
auto.commit.interval.ms
Run Code Online (Sandbox Code Playgroud)
enable.auto.commit如果设置为,则消费者偏移量自动提交到 Kafka 的频率(以毫秒为单位)true。
Type:
int
Default:
5000 (5 seconds)
Valid Values:
[0,...]
Importance:
low
Run Code Online (Sandbox Code Playgroud)
Apache Kafka 网站上记录了完整的消费者配置参数集:https://kafka.apache.org/documentation.html#newconsumerconfigs
添加标题中提到的配置的更多详细信息:“不清楚Kafka 中auto.offset.reset和的含义”enable.auto.commit
通过auto.offset.reset配置,您可以在您的消费者组从未从特定主题消费和提交或从该消费者组删除最后提交的偏移量(例如通过清理策略)的情况下引导消费者(作为消费者组的一部分)的行为)。
Kafka 主题分区中的每条消息都有一个唯一的标识符,即offset. 每个 Kafka 分区的偏移量都是唯一的。消费者通常会提交回其消费的主题的每个分区上的偏移量。这样,消费者就能够避免重复阅读。
想象一下,您有一个消费者第一次阅读某个主题(或者如果您更改了消费者组名称)。因此,消费者团体从未承诺任何补偿。根据配置文档,您可以通过配置在以下行为之间进行选择auto.offset.reset:
最早:自动将偏移量重置为最早的偏移量
最新:自动将偏移量重置为最新偏移量
none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常
其他任何事情:向消费者抛出异常。
默认设置为latest。
如上所述,在使用来自 Kafka 的消息时,考虑偏移量及其提交至关重要。将配置设置enable.auto.commit为true消费者时,偏移量将在后台自动提交。
在KafkaConsumer 的 JavaDocs中,您将找到一个很好的示例,说明如何使用以下命令在消费者客户端中手动提交偏移量:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.commitSync();
Run Code Online (Sandbox Code Playgroud)
为了再次强调偏移管理在消费者客户端中的重要性,值得阅读整个 Java 文档描述或关于偏移管理的 Kafka 文档。
| 归档时间: |
|
| 查看次数: |
21816 次 |
| 最近记录: |