Kafka消费者配置/性能问题

Sha*_*mik 5 java performance apache-kafka kafka-consumer-api

我正在尝试将kafka作为AWS SQS的替代品.动机主要是提高性能,其中kafka将消除限制,一次性提取10条消息,上限为256kb.这是我的用例的高级场景.我有一堆爬虫正在发送索引文件.有效载荷的大小平均约为1 MB.爬虫调用SOAP端点,后者又运行生产者代码以将消息提交给kafka队列.消费者应用程序接收消息并处理它们.对于我的测试框,我已经为主题配置了30个分区和2个复制.两个kafka实例正在运行1个zookeeper实例.卡夫卡版本是0.10.0.

对于我的测试,我在队列中发布了700万条消息.我创建了一个包含30个消费者线程的消费者组,每个分区一个.我最初的印象是,与我通过SQS获得的相比,这将大大加快处理能力.不幸的是,事实并非如此.就我而言,数据处理很复杂,平均需要1-2分钟才能完成.这导致了一系列的分区重新平衡,因为线程无法按时心跳.我可以在日志引用中看到一堆消息

组full_group的自动偏移提交失败:由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交.这意味着后续调用poll()的时间长于配置的session.timeout.ms,这通常意味着轮询循环花费了太多时间进行消息处理.您可以通过增加会话超时或通过max.poll.records减少poll()中返回的批量的最大大小来解决此问题.

这导致多次处理相同的消息.我尝试使用会话超时,max.poll.records和轮询时间来避免这种情况,但这会减慢整个处理时间.这是一些配置参数.

metadata.max.age.ms = 300000
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
enable.auto.commit = true
max.poll.records = 10000
request.timeout.ms = 310000
heartbeat.interval.ms = 100000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
fetch.max.wait.ms = 500
connections.max.idle.ms = 540000
session.timeout.ms = 300000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
metrics.sample.window.ms = 30000
auto.offset.reset = latest

while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                }
            }
        }
    }catch(Exception ex){
        LOGGER.error("Error in Full Consumer group worker", ex);
    }
我理解记录处理部分是我的一个瓶颈.但我相信这里的一些人有类似处理大处理时间的用例.我想通过旋转其专用线程中的每个处理器或使用大容量的线程池来进行异步处理,但不确定它是否会在系统中产生很大的负载.与此同时,我看到过一些人们使用暂停和恢复API来执行处理以避免重新平衡问题的情况.

在这种情况下,我真的在寻找一些建议/最佳实践.特别是,推荐的配置设置围绕听力,请求超时,最大轮询记录,自动提交间隔,轮询间隔等,如果kafka不是我的用例的正确工具,请让我知道.

dce*_*chi 3

您可以首先在与从 Kafka 读取的线程不同的线程中异步处理消息。这样自动提交将会非常快,并且 Kafka 不会中断你的会话。像这样的东西:

    private final BlockingQueue<TextAnalysisRequest> requests = 
new LinkedBlockingQueue();
Run Code Online (Sandbox Code Playgroud)

在阅读主题中:

while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    requests.offer(textAnalysisObj);
                }
            }
     }    
}
catch(Exception ex){
    LOGGER.error("Error in Full Consumer group worker", ex);
}
Run Code Online (Sandbox Code Playgroud)

在处理线程中:

            while (!Thread.currentThread().isInterrupted()) {
                try {
                    TextAnalysisRequest textAnalysisObj = requests.take();
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                } catch (InterruptedException e) {
                    LOGGER.info("Process thread interrupted", e);
                    Thread.currentThread().interrupt();
                } catch (Throwable t) {
                    LOGGER.warn("Unexpected throwable while processing.", t);
                }
            }
Run Code Online (Sandbox Code Playgroud)

另请查看此文档,了解通过 Kafka 发送大消息的策略: http: //blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/

简而言之,它表示 Kafka 在 10K 左右的小消息上表现最佳,如果您需要发送更大的消息,最好将它们放在网络存储上,并通过 Kafka 发送它们的位置,或者将它们拆分。