在我的spring启动服务中尝试启动kafka使用者时看到NoSuchBeanDefinitionException并且无法启动服务本身.
下面是我的bean类,它包含为Kafka配置创建的所有必需bean
Spring Boot版本:1.5.2.RELEASE
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.ns.kafka.gateway.dtos.GatewayCallBackMessage;
@Configuration
@EnableKafka
public class GatewayCallbackToPNConsumerConfig {
@Bean
public Map < String, Object > consumerProps() {
Map < String, Object > props = new HashMap < > ();
props.put(null, null);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "gatewaycallbacktopngroup");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return props;
}
@Bean
public Deserializer < String > …Run Code Online (Sandbox Code Playgroud) 我理解如下
预取只是控制代理一次允许消费者处理多少条消息。当设置为 1 时,这意味着代理将发送 1 条消息,等待 ack,然后发送下一条。
但有关以下场景的问题:
假设预取为 200,我们有 2 个消费者空闲。经纪人收到了 150 条消息,我认为经纪人会随机选择一条消息并发送所有 150 条消息?我认为是的,它不会在消费者之间进行共享。
假设一个消费者有 100 条消息处于 unack 状态,其中一条处于空闲状态,再次预取为 200 条消息。现在我们又收到了 50 条消息,我认为代理会随机将这 50 条消息分配给任一消息?或者它不会向已经有 100 条消息尚未确认的消费者提供
如果预取是200,一个消费者得到200,监听器会阻塞该线程(springrabbitmq listner方法)发送ack直到所有200被处理吗?我认为它不会一一发送ack,而是会等到所有预取的消息都处理完毕。换句话说,如果预取为 200 并且代理发送 200 条消息,那么代理何时开始收到确认?
我的spring-config.xml中有以下xml代码
<jms:listener-container acknowledge="auto"
connection-factory="cachedConnectionFactory" container-type="default"
error-handler="consumerErrorHandler" concurrency="20-25">
<jms:listener destination="#{TaskFinished.destination}"
method="onMessage" ref="taskFinished" />
</jms:listener-container>
Run Code Online (Sandbox Code Playgroud)
现在,我将我的spring xml配置文件转换为Java配置.
我把它翻译成了
@Bean(name = "consumerJmsListenerContainer")
public DefaultMessageListenerContainer consumerJmsListenerContainer() {
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer
.setConnectionFactory(cachingConnectionFactory());
messageListenerContainer.setConcurrency("20-25");
messageListenerContainer.setErrorHandler(new ConsumerErrorHandler());
messageListenerContainer
.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
messageListenerContainer.setMessageListener(new TaskFinished());
return messageListenerContainer;
}
Run Code Online (Sandbox Code Playgroud)
我需要知道的是,如果Message Container中有多个MessageListner,就像
<jms:listener-container acknowledge="auto"
connection-factory="cachedConnectionFactory" container-type="default"
error-handler="consumerErrorHandler" concurrency="20-25">
<jms:listener destination="#{questionGeneration.destination}"
method="onMessage" ref="questionGeneration" />
<jms:listener destination="#{friendShipLogic.destination}"
method="onMessage" ref="friendShipLogic" />
<jms:listener destination="#{postAvailabilityChecker.destination}"
method="onMessage" ref="postAvailabilityChecker" />
<jms:listener destination="#{playOn.destination}" method="onMessage"
ref="playOn" />
</jms:listener-container>
Run Code Online (Sandbox Code Playgroud)
我怎么能把这个xml代码转换成Java配置?
这就是我创建交换并将队列绑定到它的方式
<rabbit:topic-exchange id="dataExchange" name="MQ-EXCHANGE" durable="true">
<rabbit:bindings>
<rabbit:binding queue="COMM_QUEUE" pattern="queue.*" />
</rabbit:bindings>
</rabbit:topic-exchange>
Run Code Online (Sandbox Code Playgroud)
我已经在互联网上阅读了很多帖子,其中写道如果要保护兔子或队列崩溃,还需要将消息标记为持久性.但我无法弄清楚如何标记我的消息持久性.
这就是我将消息发布到队列的方式
@Autowired
private RabbitTemplate template;
@Override
public void produceMessage(Object message, String routingKey) {
template.convertAndSend(routingKey, message);
}
Run Code Online (Sandbox Code Playgroud)
我找了不同的API方法来了解这一点,并试图寻找我可以在XML中配置但无法找到方法的任何特定属性.任何指导?
我想使用condition添加多个类.
<div th:classappend="x.isTrue ?'class1' " ></div>
Run Code Online (Sandbox Code Playgroud)
我想要类似的东西
<div th:classappend="x.isTrue ?'class1' and "y.isTrue ?'class2'" ></div>
Run Code Online (Sandbox Code Playgroud) 是否可以在SpEL中列出stream或forEach?例如
List<String> x = new LinkedList<>(Arrays.asList("A","AAB"));
ExpressionParser parser = new SpelExpressionParser();
StandardEvaluationContext context = new StandardEvaluationContext(x);
parser.parseExpression("x.stream().map(x -> x.replaceAll(\"A\", \"B\")).collect(Collectors.toList())").getValue(context))
Run Code Online (Sandbox Code Playgroud) 我看到 spring Kafka 代码,我有一些疑问:
如果我们使用 1 个 @kafkaListener 和 2 个主题,那么 spring Kafka 将创建一个 MessageListenerContainer。如果我为每个主题使用单独的 @kafkaListener ,那么将创建 2 个 MessageListenerContainer 。
MessageListenerContainer 是消费者的意思吗?
如果我在 ConcurrentKafkaListenerContainerFactory 中将并发数设置为 4,那么这意味着对于每个 kafkaListener,我使用代理打开 4 个线程?这意味着协调员将他们视为 4 个不同的消费者。
轮询如何与 kafkaListener 一起使用?它每次只从broker那里获取1个ConsumerRecord吗?
请帮忙。
我想在向兔子发送消息时设置消息标题.我使用下面的代码,但混淆了如何设置邮件头.
public static <T> void sendMessage(String routingKey,final Object message,Class<T> type){
DefaultClassMapper typeMapper = new DefaultClassMapper();
typeMapper.setDefaultType(type);
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setClassMapper(typeMapper);
RabbitTemplate template = new RabbitTemplate(getConnectionFactory));
template.setMessageConverter(converter);
template.convertAndSend(routingKey, message);
}
Run Code Online (Sandbox Code Playgroud)
在上面的方法中,我只是争论java POJO对象及其要发送的类型.我想知道我应该在哪里设置邮件标题.
因此,我正在阅读Spring kafka文档,并遇到了Producer Listener。这就是Spring Kafka文档所说的-
“可选地,您可以配置带有ProducerListener的KafkaTemplate来获取发送(成功或失败)结果的异步回调,而不必等待Future完成。”
他们还指定了接口-
public interface ProducerListener<K, V> {
void onSuccess(String topic, Integer partition, K key, V value,
RecordMetadata recordMetadata);
void onError(String topic, Integer partition, K key, V value,
Exception exception);
boolean isInterestedInSuccess();
}
Run Code Online (Sandbox Code Playgroud)
因此,我的理解是,如果您想对消息的成功和失败做一些事情,请实现ProducerListener接口,并将其注册到KafkaTemplate。它是异步的,因此您不必等待将来完成,就可以知道发送操作的结果。
在此下方大约3句话,它提到您还可以使用KakfaTemplate的send方法返回的ListenableFuture添加回调。这也是异步的。
future.addCallback(new ListenableFutureCallback<SendResult<Integer,
String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
Run Code Online (Sandbox Code Playgroud)
所以我想知道两者之间的确切区别是什么,因为它们都是异步的。是onSuccess和onFailure / onError方法中接收到的数据之间的区别。或者是在将回调函数添加到ListenableFuture之前(因为将来在不阻塞it-get()方法的情况下不知道异步计算的结果)之前,已经开发了在KafkaTemplate中添加ProducerListener的功能(反之亦然) 。因此,仅为了确保向后兼容,两者都可以继续使用。使用一种方法相对于其他方法是否具有任何性能优势。
先感谢您。
我们将 Spring Boot 版本从 2.2.2 升级到 2.3.0,2.2.2kafka_consumer_*中 Prometheus 端点中看到的所有指标在 2.3.0 中都看不到。
例如,缺少以下所有内容:
kafka_consumer_records_consumed_total_records_totalkafka_consumer_records_lag_recordskafka_consumer_fetch_latency_max_secondskafka_consumer_bytes_consumed_total_bytes_total不确定我们是否缺少某种配置或文档中隐藏的东西......
下面是我们build.gradle.kts改动前的:
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.2.2.RELEASE"
id("io.spring.dependency-management") version "1.0.9.RELEASE"
kotlin("jvm") version "1.3.72"
kotlin("plugin.spring") version "1.3.72"
}
group = "ourGroup"
version …Run Code Online (Sandbox Code Playgroud) spring-boot spring-boot-actuator prometheus spring-cloud-stream spring-kafka
spring ×4
spring-kafka ×4
java ×3
rabbitmq ×3
spring-amqp ×3
amqp ×2
apache-kafka ×2
spring-boot ×2
jackson ×1
java-8 ×1
jms ×1
prometheus ×1
spring-el ×1
spring-jms ×1
spring-mvc ×1
thymeleaf ×1
xml ×1