来自kafka消费者的InstanceAlreadyExistsException

use*_*349 2 java multithreading apache-kafka kafka-consumer-api

我正在与Kafka合作并尝试通过撰写本文来设置消费者群体.唯一的区别是我创建了自己的抽象类,处理程序使设计更简单.

下面是我的抽象类:

public abstract class Consumer implements Runnable {
  private final Properties consumerProps;
  private final String consumerName;

  public Consumer(String consumerName, Properties consumerProps) {
    this.consumerName = consumerName;
    this.consumerProps = consumerProps;
  }

  protected abstract void shutdown();

  protected abstract void run(String consumerName, Properties consumerProps);

  @Override
  public final void run() {
    run(consumerName, consumerProps);
  }
}
Run Code Online (Sandbox Code Playgroud)

以下是我KafkaConsumerA在抽象类之上的内容:

public class KafkaConsumerA extends Consumer {
  private KafkaConsumer<byte[], DataHolder> consumer;

  public KafkaConsumerA(String consumerName, Properties consumerProps) {
    super(consumerName, consumerProps);
  }

  @Override
  public void shutdown() {
    consumer.wakeup();
  }

  @Override
  protected void run(String consumerName, Properties consumerProps) {
    // exception comes from below line from two of the threads and the remaining one thread works fine.
    consumer = new KafkaConsumer<>(consumerProps);
    List<String> topics = getTopicsBasisOnConsumerName(consumerName);
    try {
      consumer.subscribe(topics);
      // Setup the schema config
      Map<String, Object> config = new HashMap<>();
      config.put("urls", "https://abc.qa.host.com");

      GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
      while (true) {
        ConsumerRecords<byte[], DataHolder> records = consumer.poll(200);
        for (ConsumerRecord<byte[], DataHolder> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("partition", record.partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out
              .println((Thread.currentThread().getId() % 3) + 1 + ": " + decoder.decode(record.value()));
        }
      }
    } catch (WakeupException ex) {
      ex.printStackTrace();
    } catch (Exception ex) {
      ex.printStackTrace();
    } finally {
      consumer.close();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

以下是我的Handler类:

// looks like something is wrong in this class
public final class ConsumerHandler {
  private final ExecutorService executorServiceProcess;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      consumers.add(consumer);
      executorServiceProcess.submit(consumer);
    }
  }

  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceProcess.shutdown();
        try {
          executorServiceProcess.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}
Run Code Online (Sandbox Code Playgroud)

这是我从主类开始消费者群体中的所有消费者:

  public static void main(String[] args) {
    ConsumerHandler handlerA =
        new ConsumerHandler(new KafkaConsumerA("KafkaConsumerA", getConsumerProps()), 3);
    // run KafkaConsumerB here

     handlerA.shutdown();
     // shutdown KafkaConsumerB here
  }
Run Code Online (Sandbox Code Playgroud)

所以有了这个 - 我的计划是建立一个有三个消费者的消费者群体,三个消费者KafkaConsumerA订阅相同的主题.

错误:-

每当我运行它时,看起来只有消费者组中的一个消费者工作,而其他两个消费者不起作用.我在控制台上看到了这两个异常:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=a97716e0-0e05-4938-8fa1-6b872cf24e34
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_79]
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_79]
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.0-SASL.jar:na]
Run Code Online (Sandbox Code Playgroud)

我在这做什么错了?getConsumerProps()方法返回属性对象具有client.idgroup.id它与所有三个消费者,消费群相同的值.

以下是我的设计细节:

  • KafkaConsumerA将在消费者群体中拥有三个消费者,每个消费者都会参与其中topicA.
  • KafkaConsumerB(类似于KafkaConsumerA)将在不同的消费者群体中拥有两个消费者,并且每个消费者都将继续工作topicB.

而这两个消费者KafkaConsumerA,并KafkaConsumerB会与相互独立的不同的消费群同箱中运行.

Sor*_*tta 12

我知道这是一个老问题,但考虑到现在我们大量使用注释。因此添加了另一种风格的问题和答案。我们面临着同样的问题,但我们一直在同一个应用程序中的 2 个消费者中使用 @KafkaListener 注释,并且大多数属性都是直接注入的

@KafkaListener(
topics = "${app.source}",
groupId = "${app.kafka.consumer.group-id}",
clientIdPrefix = "subscriber",
containerFactory = ListenerContainerFactory.AVRO_SPECIFIC,
errorHandler = "customConsumerAwareListenerErrorHandler"
)
Run Code Online (Sandbox Code Playgroud)

我们的消费者具有类似的实现,但连接到不同的主题,因此我们只需修改“clientIdPrefix”以在实例化期间为它们提供唯一的值。所以最终的代码是

<-- 第一个消费者组件 ->

@KafkaListener(
topics = "${app.source}",
groupId = "${app.kafka.consumer.group-id}",
clientIdPrefix = "firstSubscriber",
containerFactory = ListenerContainerFactory.AVRO_SPECIFIC,
errorHandler = "customConsumerAwareListenerErrorHandler"
)
Run Code Online (Sandbox Code Playgroud)

<-- 第二个消费者组件 ->

@KafkaListener(
topics = "${app.source}",
groupId = "${app.kafka.consumer.group-id}",
clientIdPrefix = "secondSubscriber",
containerFactory = ListenerContainerFactory.AVRO_SPECIFIC,
errorHandler = "customConsumerAwareListenerErrorHandler"
)
Run Code Online (Sandbox Code Playgroud)

  • 为了更清楚地说明这一点,“clientIdPrefix”完成了这项工作。 (5认同)

Nic*_*ven 9

Kafka正在尝试注册MBean以进行应用程序监控,并且正在使用它client.id来执行此操作.正如您所说,您在抽象类中注入了属性,并为每个使用者注入相同client.idgroup.id组内的属性A.但是,你有不同的客户,所以你应该给他们自己的client.id,但保持不变group.id.这将在同一个使用者组中注册不同的客户端/使用者并使它们一起工作,但不会在MBean注册上发生冲突.

  • 好的..这个错误是否会影响该消费者组中的我的消费者,就像它不会从 kafka 接收任何数据一样?或者它只是一条警告消息,告诉 MBean 注册失败? (2认同)