小编Gar*_*ell的帖子

Spring Boot Kafka:由于NoSuchBeanDefinitionException,无法启动使用者

在我的spring启动服务中尝试启动kafka使用者时看到NoSuchBeanDefinitionException并且无法启动服务本身.

下面是我的bean类,它包含为Kafka配置创建的所有必需bean

Spring Boot版本:1.5.2.RELEASE

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import com.ns.kafka.gateway.dtos.GatewayCallBackMessage;

@Configuration
@EnableKafka
public class GatewayCallbackToPNConsumerConfig {
 @Bean
 public Map < String, Object > consumerProps() {
  Map < String, Object > props = new HashMap < > ();
  props.put(null, null);
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "gatewaycallbacktopngroup");
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  return props;
 }

 @Bean
 public Deserializer < String > …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka kafka-consumer-api spring-kafka

9
推荐指数
1
解决办法
2万
查看次数

Rabbit mq 预取理解

我理解如下

预取只是控制代理一次允许消费者处理多少条消息。当设置为 1 时,这意味着代理将发送 1 条消息,等待 ack,然后发送下一条。

但有关以下场景的问题:

  1. 假设预取为 200,我们有 2 个消费者空闲。经纪人收到了 150 条消息,我认为经纪人会随机选择一条消息并发送所有 150 条消息?我认为是的,它不会在消费者之间进行共享。

  2. 假设一个消费者有 100 条消息处于 unack 状态,其中一条处于空闲状态,再次预取为 200 条消息。现在我们又收到了 50 条消息,我认为代理会随机将这 50 条消息分配给任一消息?或者它不会向已经有 100 条消息尚未确认的消费者提供

  3. 如果预取是200,一个消费者得到200,监听器会阻塞该线程(springrabbitmq listner方法)发送ack直到所有200被处理吗?我认为它不会一一发送ack,而是会等到所有预取的消息都处理完毕。换句话说,如果预取为 200 并且代理发送 200 条消息,那么代理何时开始收到确认?

amqp rabbitmq spring-rabbit spring-amqp

9
推荐指数
1
解决办法
2万
查看次数

如何在Spring Java Config的单个MessageListenerContainer中添加多个JMS MessageListner

我的spring-config.xml中有以下xml代码

<jms:listener-container acknowledge="auto"
        connection-factory="cachedConnectionFactory" container-type="default"
        error-handler="consumerErrorHandler" concurrency="20-25">
        <jms:listener destination="#{TaskFinished.destination}"
            method="onMessage" ref="taskFinished" />
</jms:listener-container>
Run Code Online (Sandbox Code Playgroud)

现在,我将我的spring xml配置文件转换为Java配置.

我把它翻译成了

@Bean(name = "consumerJmsListenerContainer")
public DefaultMessageListenerContainer consumerJmsListenerContainer() {
    DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
    messageListenerContainer
            .setConnectionFactory(cachingConnectionFactory());
    messageListenerContainer.setConcurrency("20-25");
    messageListenerContainer.setErrorHandler(new ConsumerErrorHandler());
    messageListenerContainer
            .setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
    messageListenerContainer.setMessageListener(new TaskFinished());
    return messageListenerContainer;
}
Run Code Online (Sandbox Code Playgroud)

我需要知道的是,如果Message Container中有多个MessageListner,就像

<jms:listener-container acknowledge="auto"
        connection-factory="cachedConnectionFactory" container-type="default"
        error-handler="consumerErrorHandler" concurrency="20-25">
        <jms:listener destination="#{questionGeneration.destination}"
            method="onMessage" ref="questionGeneration" />
        <jms:listener destination="#{friendShipLogic.destination}"
            method="onMessage" ref="friendShipLogic" />
        <jms:listener destination="#{postAvailabilityChecker.destination}"
            method="onMessage" ref="postAvailabilityChecker" />
        <jms:listener destination="#{playOn.destination}" method="onMessage"
            ref="playOn" />
</jms:listener-container>
Run Code Online (Sandbox Code Playgroud)

我怎么能把这个xml代码转换成Java配置?

java xml spring jms spring-jms

8
推荐指数
1
解决办法
3949
查看次数

如何使用spring-rabbitmq将消息标记为持久性?

这就是我创建交换并将队列绑定到它的方式

<rabbit:topic-exchange id="dataExchange" name="MQ-EXCHANGE" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="COMM_QUEUE" pattern="queue.*" />
        </rabbit:bindings>
</rabbit:topic-exchange>
Run Code Online (Sandbox Code Playgroud)

我已经在互联网上阅读了很多帖子,其中写道如果要保护兔子或队列崩溃,还需要将消息标记为持久性.但我无法弄清楚如何标记我的消息持久性.

这就是我将消息发布到队列的方式

    @Autowired
    private RabbitTemplate template;

    @Override
    public void produceMessage(Object message, String routingKey) {
        template.convertAndSend(routingKey, message);  
    }
Run Code Online (Sandbox Code Playgroud)

我找了不同的API方法来了解这一点,并试图寻找我可以在XML中配置但无法找到方法的任何特定属性.任何指导?

rabbitmq spring-rabbit spring-amqp

8
推荐指数
1
解决办法
3124
查看次数

Thymeleaf classappend适用于多个班级

我想使用condition添加多个类.

<div th:classappend="x.isTrue ?'class1' "  ></div>
Run Code Online (Sandbox Code Playgroud)

我想要类似的东西

<div th:classappend="x.isTrue ?'class1' and "y.isTrue ?'class2'"  ></div>
Run Code Online (Sandbox Code Playgroud)

spring-mvc thymeleaf spring-boot

8
推荐指数
1
解决办法
9806
查看次数

Spring Expression Language - Java 8 forEach或stream on list

是否可以在SpEL中列出stream或forEach?例如

List<String> x = new LinkedList<>(Arrays.asList("A","AAB"));
ExpressionParser parser = new SpelExpressionParser();
StandardEvaluationContext context = new StandardEvaluationContext(x);
parser.parseExpression("x.stream().map(x -> x.replaceAll(\"A\", \"B\")).collect(Collectors.toList())").getValue(context))
Run Code Online (Sandbox Code Playgroud)

java spring spring-el java-8

8
推荐指数
1
解决办法
3010
查看次数

Spring Kafka MessageListenerContainer

我看到 spring Kafka 代码,我有一些疑问:

  1. 如果我们使用 1 个 @kafkaListener 和 2 个主题,那么 spring Kafka 将创建一个 MessageListenerContainer。如果我为每个主题使用单独的 @kafkaListener ,那么将创建 2 个 MessageListenerContainer 。

  2. MessageListenerContainer 是消费者的意思吗?

  3. 如果我在 ConcurrentKafkaListenerContainerFactory 中将并发数设置为 4,那么这意味着对于每个 kafkaListener,我使用代理打开 4 个线程?这意味着协调员将他们视为 4 个不同的消费者。

  4. 轮询如何与 kafkaListener 一起使用?它每次只从broker那里获取1个ConsumerRecord吗?

请帮忙。

apache-kafka kafka-consumer-api spring-kafka

8
推荐指数
1
解决办法
5363
查看次数

发送时在rabbitmq中设置邮件头

我想在向兔子发送消息时设置消息标题.我使用下面的代码,但混淆了如何设置邮件头.

public static <T> void sendMessage(String routingKey,final Object message,Class<T> type){
    DefaultClassMapper typeMapper = new DefaultClassMapper();
    typeMapper.setDefaultType(type);

    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setClassMapper(typeMapper);

    RabbitTemplate template = new RabbitTemplate(getConnectionFactory));
    template.setMessageConverter(converter);

    template.convertAndSend(routingKey, message);
}
Run Code Online (Sandbox Code Playgroud)

在上面的方法中,我只是争论java POJO对象及其要发送的类型.我想知道我应该在哪里设置邮件标题.

amqp rabbitmq jackson spring-rabbit spring-amqp

7
推荐指数
1
解决办法
7192
查看次数

Spring Kafka-使用Producer Listener配置KafkaTemplate与使用Listenable Future注册回调之间的区别

因此,我正在阅读Spring kafka文档,并遇到了Producer Listener。这就是Spring Kafka文档所说的-

“可选地,您可以配置带有ProducerListener的KafkaTemplate来获取发送(成功或失败)结果的异步回调,而不必等待Future完成。”

他们还指定了接口-

     public interface ProducerListener<K, V> {

      void onSuccess(String topic, Integer partition, K key, V value, 
      RecordMetadata recordMetadata);

      void onError(String topic, Integer partition, K key, V value, 
      Exception exception);

      boolean isInterestedInSuccess();

  }
Run Code Online (Sandbox Code Playgroud)

因此,我的理解是,如果您想对消息的成功和失败做一些事情,请实现ProducerListener接口,并将其注册到KafkaTemplate。它是异步的,因此您不必等待将来完成,就可以知道发送操作的结果。

在此下方大约3句话,它提到您还可以使用KakfaTemplate的send方法返回的ListenableFuture添加回调。这也是异步的。

 future.addCallback(new ListenableFutureCallback<SendResult<Integer, 
   String>>() {

 @Override
 public void onSuccess(SendResult<Integer, String> result) {
    ...
  }

  @Override
  public void onFailure(Throwable ex) {
    ...
  }

  });
Run Code Online (Sandbox Code Playgroud)

所以我想知道两者之间的确切区别是什么,因为它们都是异步的。是onSuccess和onFailure / onError方法中接收到的数据之间的区别。或者是在将回调函数添加到ListenableFuture之前(因为将来在不阻塞it-get()方法的情况下不知道异步计算的结果)之前,已经开发了在KafkaTemplate中添加ProducerListener的功能(反之亦然) 。因此,仅为了确保向后兼容,两者都可以继续使用。使用一种方法相对于其他方法是否具有任何性能优势。

先感谢您。

java spring spring-kafka

7
推荐指数
1
解决办法
2071
查看次数

Kafka 消费者指标从 Spring Boot 2.2.2 升级到 2.3.0

问题:

我们将 Spring Boot 版本从 2.2.2 升级到 2.3.0,2.2.2kafka_consumer_*中 Prometheus 端点中看到的所有指标在 2.3.0 中都看不到。

例如,缺少以下所有内容:

  • kafka_consumer_records_consumed_total_records_total
  • kafka_consumer_records_lag_records
  • kafka_consumer_fetch_latency_max_seconds
  • kafka_consumer_bytes_consumed_total_bytes_total

不确定我们是否缺少某种配置或文档中隐藏的东西......

已经尝试过的:

  • 梳理了 Spring Boot 2.3.0 发行说明、更新的千分尺文档和更新的 spring-kafka 文档,了解为什么会发生这种情况
  • 谷歌搜索到感觉就像地球的尽头
  • 尝试升级到 Spring Boot 2.2.7 并且 kafka 指标仍然存在,只有升级到 2.3.0 似乎会导致问题
  • 删除了我们项目代码中的任何不需要的依赖项/自定义,并且裸机只是连接到本地主机上的 kafka 容器,并且指标仍然没有出现

相关代码/详情:

  • 我们将 Red Hat AMQ Streams 用于我们的 kafka 代理(kafka 版本 2.1.1)
  • 我们在环境中唯一改变的是 Spring Boot 版本(以及自动拉入/更新的依赖项)以重新创建此问题

下面是我们build.gradle.kts改动前的:

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.2.2.RELEASE"
    id("io.spring.dependency-management") version "1.0.9.RELEASE"
    kotlin("jvm") version "1.3.72"
    kotlin("plugin.spring") version "1.3.72"
}

group = "ourGroup"
version …
Run Code Online (Sandbox Code Playgroud)

spring-boot spring-boot-actuator prometheus spring-cloud-stream spring-kafka

7
推荐指数
1
解决办法
2360
查看次数