为什么消费者在使用Client API for Java消费DC/OS上的Kafka消息时会挂起?

ddd*_*ddd 6 java apache-kafka dcos

我在AWS上的DC/OS(Mesos)集群上安装了Kafka.启用了三个代理并创建了一个名为"topic1"的主题.

dcos kafka topic create topic1 --partitions 3 --replication 3
Run Code Online (Sandbox Code Playgroud)

然后我编写了一个Producer类来发送消息,并编写了一个Consumer类来接收它们.

public class Producer {
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<>();
        System.out.println("setting Producerconfig.");
        producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

        ByteArraySerializer serializer = new ByteArraySerializer();
        System.out.println("Creating KafkaProcuder");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
        for (int i = 0; i < 100; i++) {
            String msgstr = msg + i;
            byte[] message = msgstr.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
            System.out.println("Sent:" + msgstr);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        sendMessage("Kafka test message 2/27 3:32");
    }

}

public class Consumer {
    public static String getMessage() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        consumerConfig.put("group.id", "dj-group");
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
            System.out.println(records.count() + " of records received.");
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(Arrays.toString(record.value()));
            }
        }
    }

    public static void main(String[] args) {
        getMessage();
    }
}
Run Code Online (Sandbox Code Playgroud)

首先,我Producer在群集上运行以发送消息topic1.然而,当我跑步时Consumer,它无法接收任何东西,只是挂起.

Producer 因为我能够通过运行Kafka安装附带的shell脚本来获取所有消息,所以工作正常

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning
Run Code Online (Sandbox Code Playgroud)

但为什么我不能接受Consumer?这篇文章建议使用旧偏移的group.id可能是一个可能的原因.我只在消费者中创建group.id而不是生产者.如何为此组配置偏移量?

ddd*_*ddd 5

事实证明,kafkaConsumer.subscribe(Arrays.asList("topic1"));导致poll()挂起.根据Kafka Consumer不接收消息 ,有两种方法可以连接到一个主题,assign并且subscribe.在我用subscribe下面的行替换之后,它开始工作了.

    TopicPartition tp = new TopicPartition("topic1", 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);
Run Code Online (Sandbox Code Playgroud)

但是输出显示了不期望的数字数组(生产者发送了字符串).但我想这是一个单独的问题.

  • 所谓的"单独问题"是你正在接收字节(因为Kafka处理字节下的字节).您应该使用反序列化器,例如`key.deserializer = org.apache.kafka.common.serialization.StringDeserializer`作为键,并使用单独的值作为值.请参阅http://kafka.apache.org/documentation/(但我找不到SerDe的确切页面). (3认同)