Kafka 消费者重新平衡时间太长

Nan*_*dia 6 apache-kafka apache-kafka-streams

我有一个 Kafka Streams 应用程序,它从几个主题获取数据并连接数据并将其放入另一个主题中。

卡夫卡配置:

5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor. 
Run Code Online (Sandbox Code Playgroud)

注意:我在运行 Kafka Brokers 的同一台机器上运行 Kafka Streams 应用程序。

每小时消耗/产生几百万条记录。每当我关闭任何卡夫卡经纪人时,它就会进入重新平衡状态,并且需要大约。重新平衡需要 30 分钟,有时甚至更长。

有人知道如何解决 kafka 消费者的重新平衡问题吗?此外,很多时候它在重新平衡时会抛出异常。

这导致我们无法使用此设置在生产环境中运行。任何帮助,将不胜感激。

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)
Run Code Online (Sandbox Code Playgroud)

卡夫卡流配置:

bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000
Run Code Online (Sandbox Code Playgroud)

它内部创建的 ConsumerConfig 是:

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
    check.crcs = true
    client.id = conversion-live-StreamThread-1-restore-consumer
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 2147483647
    max.poll.records = 100
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 40000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Run Code Online (Sandbox Code Playgroud)

Mat*_*Sax 5

我建议StandbyTasks通过参数进行配置num.standby.replicas=1(默认为0)。这应该有助于显着减少重新平衡时间。

此外,我建议将您的应用程序升级到 Kafka 0.11。请注意,Streams API 0.11 向后兼容 0.10.1 和 0.10.2 代理,因此,您无需为此升级代理。再平衡行为在 0.11 中得到了很大改进,并将在即将发布的 1.0 版本中得到进一步改进(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-167%3A+Add+interface+for+the+state +store+restoration+process),因此,将应用程序升级到最新版本始终是重新平衡的改进。