调整 kafka 流以提高速度

red*_*guy 5 apache-kafka apache-kafka-streams

我有两个流:

[topicA] -> processingA -> [topicB] -> processingB -> [topicC]

通过登录我的应用程序,我注意到在将输出从 processingA 发送到 topicB 和从 topicB 为 processingB 挑选消息之间,每次需要超过 100 毫秒(而不是 150 毫秒)。它可能不多,但它会累积,最后相当简单的级联处理几乎需要秒。

我可以调整 kafka 以使这些延迟尽可能接近零吗?哪些配置参数对这些延迟有影响?

我主要是默认配置。是 commit.interval.ms 导致延迟吗?我已经将它从更高的默认值更改了...

StreamsConfig values: 
    application.id = app
    application.server = 
    bootstrap.servers = [localhost:9092]
    buffered.records.per.partition = 1000
    cache.max.bytes.buffering = 10485760
    client.id = 
    commit.interval.ms = 100
    connections.max.idle.ms = 540000
    default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
    default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    num.standby.replicas = 0
    num.stream.threads = 1
    partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
    poll.ms = 100
    processing.guarantee = exactly_once
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    replication.factor = 1
    request.timeout.ms = 40000
    retries = 0
    retry.backoff.ms = 100
    rocksdb.config.setter = null
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    state.cleanup.delay.ms = 600000
    state.dir = /tmp/kafka-streams
    topology.optimization = none
    upgrade.from = null
    windowstore.changelog.additional.retention.ms = 86400000
Run Code Online (Sandbox Code Playgroud)

pgr*_*ras 0

在您的情况下,100 到 150 毫秒似乎很正常,因为您设置了 commit.interval.ms = 100 (这也是processing.guarantee = just_once 的默认值)。恰好一次,processingB 将仅读取已在 [topicB] 中提交的消息,而processingA 将仅在(至少)100 毫秒后提交。

在不重新架构系统的情况下,您可以调整 commit.interval.ms 以获得更好的延迟,但如果降低该值,吞吐量也会降低。

请参阅 这篇文章(Streams Performance Implications),其中描述了这种权衡。