小编Acc*_*eml的帖子

如何在kafka 0.9.0中使用多线程消费者?

kafka的文档给出了以下描述的方法:

每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例.

我的代码:

public class KafkaConsumerRunner implements Runnable {

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CloudKafkaConsumer consumer;
    private final String topicName;

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
        this.consumer = consumer;
        this.topicName = topicName;
    }

    @Override
    public void run() {
        try {
            this.consumer.subscribe(topicName);
            ConsumerRecords<String, String> records;
            while (!closed.get()) {
                synchronized (consumer) {
                    records = consumer.poll(100);
                }
                for (ConsumerRecord<String, String> tmp : records) {
                    System.out.println(tmp.value());
                }
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            System.out.println(e);
            //if …
Run Code Online (Sandbox Code Playgroud)

java multithreading distributed-computing apache-kafka apache-flink

8
推荐指数
1
解决办法
1万
查看次数