增加 Kafka Streams 消费者吞吐量

Gui*_*ara 5 spark-streaming kafka-consumer-api apache-kafka-streams spark-streaming-kafka

我有一个并排运行的 Spark Streaming 应用程序和一个 Kafka Streams 应用程序,用于基准测试。两者都使用相同的输入主题并写入不同的目标数据库。输入主题有 15 个分区,spark 流和 kafka 流都有 15 个消费者(1:1 的比例)。此外,事件有效负载约为 2kb。不确定它是否相关,但 Spark Streaming 的 90% 百分位执行时间约为 9 毫秒。卡夫卡流,12 毫秒。每次处理消息时,都会在我的处理器中调用 commit() 方法。

问题依赖于高爆发。Spark Streaming 可以跟上每秒 700 次,而 Kafka Streams 只能跟上每秒 60/70 次。我不能超越那个。见下图:(绿线 - Spark Streaming / 蓝线 - Kafka Streams)

绿线 - Spark Streaming / 蓝线 - Kafka Streams

根据下面的配置,只要每个消费者不超过 1000 个事件,考虑到背压,火花流可以跟上,无论每个分区的字节数如何。至于 Kafka Streams,如果我正确理解了它的配置(请保持诚实),基于下面的相同,我能够每 100 毫秒(poll.ms)获取最多 1000 条记录(max.poll.records),只要每个分区不超过 1MB (max.partition.fetch.bytes) 和每次提取不超过 50MB (fetch.max.bytes)。

我看到相同的结果(每秒停留在 70 个事件上),无论我使用的是 5、10 还是 15 个消费者,这让我认为它与配置有关。我试图通过增加每次获取的记录数和每个分区的最大字节数来调整这些,但我没有得到显着的结果。

我知道这些是不同的技术并用于不同的目的,但我想知道我应该在 Kafka Streams 中使用哪些值以获得更好的吞吐量。

Spark Streaming 配置:

spark.batch.duration=10
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRatePerPartition=100
Run Code Online (Sandbox Code Playgroud)

Kafka Streams 配置(所有字节和时间相关)

# Consumer Config
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
heartbeat.interval.ms = 3000 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 300000 
max.poll.records = 1000 
request.timeout.ms = 30000
enable.auto.commit = false

# StreamsConfig
poll.ms=100 
Run Code Online (Sandbox Code Playgroud)

处理器代码

public class KStreamsMessageProcessor extends AbstractProcessor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String payload) {

        ResponseEntity responseEntity = null;
        try {

          // Do Some processing

        } catch (final MyException e) {

          // Do Some Exception Handling

        } finally {

            context.forward(UUID.randomUUID().toString(), responseEntity);
            context.commit();
        }
    }

Run Code Online (Sandbox Code Playgroud)

提前致谢!

Gui*_*ara 5

更新

Kafka Streams 写入的数据库是这里的大瓶颈。在我们将其切换到更好的集群(更好的硬件、内存、内核等)后,我使用下面的配置进行了调整,每秒能够消耗大约 2k 个事件。提交间隔配置也已更改(根据 Augusto 建议),并且还使用了 G1GC 垃圾收集器。

fetch.max.bytes = 52428800
max.partition.fetch.bytes = 1048576 

fetch.max.wait.ms = 1000 
max.poll.records = 10000 
fetch.min.bytes = 100000
enable.auto.commit = false
Run Code Online (Sandbox Code Playgroud)

1