Rak*_*n S 5 java apache-kafka kafka-producer-api spring-kafka
我使用带有幂等生产者配置的spring-kafka:
这些是我的配置道具:
Run Code Online (Sandbox Code Playgroud)Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers()));
//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProps.getJksLocation());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, appProps.getJksPassword());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Run Code Online (Sandbox Code Playgroud)
我的kafka生产者抛出OutOfOrderSequenceException:
2019-03-06 21:25:47发件人[错误] [生产者clientId = producer-1]经纪人返回org.apache.kafka.common.errors.OutOfOrderSequenceException:经纪人收到了主题分区的乱序序号在偏移量-1处为topic-1。这表明代理上的数据丢失,应进行调查。2019-03-06 21:25:47 TransactionManager [INFO] [Producer clientId = producer-1] ProducerId设置为-1并带有时代-1 2019-03-06 21:25:47 ProducerKafka [ERROR]我们在发送时遇到错误要kafka,请重试该工作
我不确定为什么会引发此异常。我找不到对此的具体答案。异常的官方javadoc声明以下内容:
此异常表示代理从生产者接收到意外的序列号,这意味着数据可能已丢失。如果生产者仅配置为幂等(即,如果设置了enable.idempotence且未配置transactional.id),则可以继续使用同一生产者实例进行发送,但是这样做可能会重新排序已发送记录的顺序。对于事务性生产者,这是一个致命错误,您应该关闭生产者。
这是否意味着我需要使用事务性生产者来避免此问题?
KafkaProducer文档指出了使上述说法含糊的内容:https ://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
要启用幂等,必须将enable.idempotence配置设置为true。如果设置,则重试配置将默认为Integer.MAX_VALUE,max.in.flight.requests.per.connection配置将默认为1,而acks配置将默认为all。幂等生产者没有API更改,因此无需修改现有应用程序即可利用此功能。
为了利用幂等生成器,必须避免重新发送应用程序级别,因为这些应用程序无法重复删除。因此,如果应用程序启用幂等,建议不要设置重试配置,因为它将默认设置为Integer.MAX_VALUE。此外,如果send(ProducerRecord)即使无限次重试也返回错误(例如,如果消息在发送之前在缓冲区中过期),则建议关闭生产者并检查最后产生的消息的内容,以确保不能重复。最后,生产者只能保证在单个会话中发送的消息具有幂等性。
上面的声明清楚地表明,我需要一个幂等的生产者,而仅仅是使用enable.idempotence财产。但是,异常指出我必须使用该transactional.id属性。
什么是无需应对致命创建异步生产幂等的正确途径OutOfOrderSequenceException。
我觉得这很清楚;从你的第二句话来看...
为了利用幂等生产者,必须避免应用程序级别的重新发送,因为这些无法进行重复数据删除。因此,如果应用程序启用幂等性,建议不要设置重试配置,因为它将默认为 Integer.MAX_VALUE。
你有
props.put(ProducerConfig.RETRIES_CONFIG, 5);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
389 次 |
| 最近记录: |