如何在Kafka中使用多个消费者?

Jef*_*ong 30 java bigdata apache-kafka

我是一名学习卡夫卡的新生,我遇到了一些基本问题,理解了多个消费者,文章,文件等对目前来说并没有太大的帮助.

我试图做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,向主题发布100条简单消息并让我的消费者检索它们.我已经成功地做到了这一点,但是当我尝试引入第二个消费者来消费刚刚发布消息的同一主题时,它不会收到任何消息.

我的理解是,对于每个主题,您可以拥有来自不同消费者群体的消费者,并且每个消费者群体都可以获得针对某个主题生成的消息的完整副本.它是否正确?如果没有,那么建立多个消费者的正确方法是什么?这是我到目前为止写的消费者类:

public class AlternateConsumer extends Thread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;
    private final Boolean isAsync = false;

    public AlternateConsumer(String topic, String consumerGroup) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", consumerGroup);
        properties.put("partition.assignment.strategy", "roundrobin");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<Integer, String>(properties);
        consumer.subscribe(topic);
        this.topic = topic;
    }


    public void run() {
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(0);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
            }
        }

    }
}
Run Code Online (Sandbox Code Playgroud)

此外,我注意到最初我只使用一个分区测试上述消耗的主题'test'.当我将另一个消费者添加到现有的消费者群体中时说'testGroup'时,这会触发Kafka重新平衡,这会使我的消费延迟减慢很多,大小为秒.我认为这是重新平衡的一个问题,因为我只有一个分区,但是当我创建一个新的主题"多个分区",比如6个分区时,出现了类似的问题,即向同一个消费者群体添加更多的消费者会导致延迟问题.我环顾四周,人们告诉我,我应该使用一个多线程的消费者 - 任何人都能明白这一点吗?

Chr*_*ken 21

我认为你的问题在于auto.offset.reset属性.当新的使用者从分区读取并且没有先前提交的偏移量时,auto.offset.reset属性用于决定起始偏移量应该是什么.如果将其设置为"最大"(默认值),则会开始读取最新(最后)消息.如果将其设置为"最小",则会获得第一个可用消息.

所以添加:

properties.put("auto.offset.reset", "smallest");
Run Code Online (Sandbox Code Playgroud)

然后再试一次.

  • 这是一个迟到的回应,但谢谢克里斯!您的解决方案是正确的,在仔细查看一些文档之后我应该注意到,在启动新的消费者时,它被设置为仅使用最新发送的消息 - 除非设置了上述属性,否则不会预先存在消息. (3认同)

小智 7

如果希望多个使用者使用相同的消息(例如广播),则可以使用不同的使用者组生成它们,也可以在使用者配置中将auto.offset.reset设置为最小。如果您希望多个使用者完成并行消费(将工作划分为多个),则应创建分区数> =使用者数。一个分区最多只能由一个使用者进程使用。但是,一个消费者可以使用多个分区。


Alp*_*ure 5

这里的文档中说:“如果提供的线程数超过该主题的分区数,则某些线程将永远不会看到消息”。您可以向主题添加分区吗?我的使用者组线程数等于主题中的分区数,并且每个线程都在获取消息。

这是我的主题配置:

buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins
Topic:recent-wins   PartitionCount:3    ReplicationFactor:1 Configs:
Topic: recent-wins  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic: recent-wins  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
Topic: recent-wins  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
Run Code Online (Sandbox Code Playgroud)

而我的消费者:

package com.cie.dispatcher.services;

import com.cie.dispatcher.model.WinNotification;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.Managed;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * This will create three threads, assign them to a "group" and listen for  notifications on a topic.
 * Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by
 * the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the
 * lifecycle manager in dropwizard.
 * <p/>
 * Created by aakture on 6/15/15.
 */
public class KafkaTopicListener implements Managed {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private int threadCount;
private WinNotificationWorkflow winNotificationWorkflow;
private ObjectMapper objectMapper;

@Inject
public KafkaTopicListener(String a_zookeeper,
                          String a_groupId, String a_topic,
                          int threadCount,
                          WinNotificationWorkflow winNotificationWorkflow,
                          ObjectMapper objectMapper) {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig(a_zookeeper, a_groupId));
    this.topic = a_topic;
    this.threadCount = threadCount;
    this.winNotificationWorkflow = winNotificationWorkflow;
    this.objectMapper = objectMapper;
}

/**
 * Creates the config for a connection
 *
 * @param zookeeper the host:port for zookeeper, "localhost:2181" for example.
 * @param groupId   the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads.
 * @return the config props
 */
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
    Properties props = new Properties();
    props.put("zookeeper.connect", zookeeper);
    props.put("group.id", groupId);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);
}

public void stop() {
    if (consumer != null) consumer.shutdown();
    if (executor != null) executor.shutdown();
    try {
        if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
            LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
        }
    } catch (InterruptedException e) {
        LOG.info("Interrupted during shutdown, exiting uncleanly");
    }
    LOG.info("{} shutdown successfully", this.getClass().getName());
}
/**
 * Starts the listener
 */
public void start() {
    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put(topic, new Integer(threadCount));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    executor = Executors.newFixedThreadPool(threadCount);
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ListenerThread(stream, threadNumber));
        threadNumber++;
    }
}

private class ListenerThread implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ListenerThread(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        try {
            String message = null;
            LOG.info("started listener thread: {}", m_threadNumber);
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext()) {
                try {
                    message = new String(it.next().message());
                    LOG.info("receive message by " + m_threadNumber + " : " + message);
                    WinNotification winNotification = objectMapper.readValue(message, WinNotification.class);
                    winNotificationWorkflow.process(winNotification);
                } catch (Exception ex) {
                    LOG.error("error processing queue for message: " + message, ex);
                }
            }
            LOG.info("Shutting down listener thread: " + m_threadNumber);
        } catch (Exception ex) {
            LOG.error("error:", ex);
        }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)