Spring + Kafka:事务处理缓慢

LG8*_*G87 2 performance apache-kafka kafka-consumer-api spring-kafka

刚开始使用Spring Kafka(2.1.4.RELEASE)和Kafka(1.0.0)但是当我添加事务时,处理速度降低了很多。

代码:

spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)

在 Java 中我添加了:

spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)

当我删除该setTransactionManager(transactionManager)语句后,速度提高了很多。我做错了什么吗?

Gar*_*ell 5

Kafka transactions are quite expensive - especially if you commit each send.

See Transactions in Apache Kafka.

Scroll down to "How transactions perform, and how to tune them".

As we can see the overhead is independent of the number of messages written as part of a transaction. So the key to having higher throughput is to include a larger number of messages per transaction.

With Spring for Apache Kafka you can do multiple sends in the same transaction using the executeInTransaction method. Or by using Spring transaction management with a KafkaTransactionManager and performing multiple sends within a @Transactional method.

EDIT

我没有注意到监听器容器;我假设您正在使用一条消息,执行一些转换并发送到另一个主题。因此,在这种情况下,您不能“在事务中发送多条消息”,因为容器管理事务,并且默认情况下在每次传送后提交。

增加并发度不会影响事务语义;在您的情况下(并发数为 10),分区分布在 10 个线程中。每个线程运行一个单独的事务。

batchListener您可以通过true在容器工厂上设置来进一步加快速度。

在这种情况下,您@KafkaListener会得到一个List<ConsumerRecord>(或者List<Foo>如果您正在使用转换);您可以迭代列表并处理每个记录并将其与模板一起发送(不要使用,executeInTransaction因为已经存在一个由容器线程启动的事务)。然后,当批处理完成时,容器将提交事务。

您可以使用 kafka consuer 属性控制批量大小max.poll.records