Kri*_*hna 7 java apache-kafka kafka-producer-api
尝试将大约 50K 条消息加载到 KAFKA 主题中。在少数运行开始时低于异常但并非总是如此。
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]
...
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.
Run Code Online (Sandbox Code Playgroud)
代码块如下:
public void persistUpdatesPostAction(List<Message> messageList ) {
if ((messageList == null) || (messageList.isEmpty())) {
return;
}
logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
try {
producer.beginTransaction();
createKafkaBulkInsert1(producer, messageList, "Topic1");
createKafkaBulkInsert2(producer, messageList, "Topic2");
createKafkaBulkInsert3(producer, messageList, "Topic3");
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
producer.close();
KafkaUtils.removeProducer(Thread.currentThread().getName());
}
}
-----------
static Properties setPropertiesProducer() {
Properties temp = new Properties();
temp.put("bootstrap.servers", "localhost:9092");
temp.put("acks", "all");
temp.put("retries", 1);
temp.put("batch.size", 16384);
temp.put("linger.ms", 5);
temp.put("buffer.memory", 33554432);
temp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return temp;
}
public static Producer<String, String> getProducer(String aThreadId) {
if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) {
Properties temp = producerProps;
temp.put("transactional.id", aThreadId);
Producer<String, String> producer = new KafkaProducer<String, String>(temp);
producerMap.put(aThreadId, producer);
producer.initTransactions();
return producer;
}
return producerMap.get(aThreadId);
}
public static void removeProducer(String aThreadId) {
logger.createDebug("Removing Thread ID :" + aThreadId);
if (producerMap.get(aThreadId) == null)
return;
producerMap.remove(aThreadId);
}
Run Code Online (Sandbox Code Playgroud)
Gra*_*ray 15
引起:org.apache.kafka.common.errors.ProducerFencedException:生产者尝试使用旧时代进行操作。要么有一个新的生产者具有相同的 transactionalId,要么生产者的交易已被经纪人过期。
此异常消息不是很有帮助。我相信它试图说经纪人不再有客户端发送的交易 ID 的任何记录。这可能是因为:
UUID.randomUUID().在我们的例子中,我们经常遇到事务超时,从而产生了这个异常。有 2 个属性可以控制经纪人在中止和忘记交易之前记住交易的时间。
transaction.max.timeout.ms-- 一个代理属性,指定事务中止和忘记之前的最大毫秒数。许多 Kafka 版本的默认值似乎是 900000(15 分钟)。 来自 Kafka 的文档说:
事务的最大允许超时时间。如果客户端请求的事务时间超过此时间,则代理将在 InitProducerIdRequest 中返回错误。这可以防止客户端超时过大,这可能会阻止消费者从事务中包含的主题中读取。
transaction.timeout.ms-- 一个生产者客户端属性,它在创建事务时以毫秒为单位设置超时。许多 Kafka 版本的默认值似乎是 60000(1 分钟)。来自 Kafka 的文档说:
事务协调器在主动中止正在进行的事务之前等待来自生产者的事务状态更新的最长时间(以毫秒为单位)。
如果transaction.timeout.ms客户端中设置的属性超过了transaction.max.timeout.ms代理中的属性,生产者会立即抛出类似以下异常的内容:
org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse
The transaction timeout is larger than the maximum value allowed by the broker
(as configured by transaction.max.timeout.ms).
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8558 次 |
| 最近记录: |