以前我正在读取队列中存在的所有消息,但现在我必须根据用户选择(计数)返回特定数量的消息.
我尝试相应地更改for循环,但由于自动确认,它读取所有消息.所以我尝试在配置文件中将其更改为手动.
在我的程序中如何在阅读msg后手动确认消息(目前我正在使用AmqpTemplate接收并且我没有频道参考)?
Properties properties = admin.getQueueProperties("queue_name");
if(null != properties)
{
Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());
while(messageCount > 0)
{
Message msg = amqpTemplate.receive(queue_name);
String value = new String(msg.getBody());
valueList.add(value);
messageCount--;
}
}
Run Code Online (Sandbox Code Playgroud)
任何帮助都非常值得感谢,提前谢谢.
如果我在其签名中添加MessageProperties,handleMessage方法不会从队列中获取消息.如果没有MessageProperties,它可以正常工作.
如何在MessageListenerAdapter的handleMessage中获取MessageProperties?
public class EventMessageAdapter {
public void handleMessage(MessageProperties messageProperties, Event event) {
...
String id = messageProperties.getHeaders().get("key");
}
Run Code Online (Sandbox Code Playgroud) 事实上我不明白这个运行。也许我误解了一些东西,无论如何这是不可能的。我正在尝试在同一个队列、相同的交换器上配置 2 个侦听器,但只有路由键应该不同。我的问题是事情不知何故变得混乱。结果是侦听器 A 收到了侦听器 B 的消息。但只是有时,有时一切正常。有什么建议么?
我的配置
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHostname());
connectionFactory.setUsername(getUsername());
connectionFactory.setPassword(getPassword());
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new CustomMessageConverter());
factory.setConnectionFactory(connectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(10);
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
我的听众A
@RabbitListener(bindings = @QueueBinding(value = @Queue(value …Run Code Online (Sandbox Code Playgroud) 我正在使用带有Rabbit模板的SpringAMQP.如何将邮件直接发送到省略Exchange的队列?我该怎么做?
我正在使用一个使用spring-kafka的Kafka监听器的应用程序.我面临的问题是当Kafka监听器没有打开时弹簧上下文初始化失败(各种原因,例如Kafka服务器没有打开或关闭).如何确保我的应用程序是独立的.
谁能请帮忙.
我在我的服务中使用TransactionSynchronizationManager.registerSynchronization(...),但无法为此方法编写测试,因为我收到异常:java.lang.IllegalStateException: Transaction synchronization is not active。
我的服务的实施:
@Transactional
public void save(Order order) {
log.info("Starting transaction...");
orderDAO.save(order);
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
log.info("#afterCommit");
eventPublisher.publishEvent(new OrderPlacedEvent(order));
}
}
);
log.info("Commit");
}
Run Code Online (Sandbox Code Playgroud)
并测试哪个不起作用:
@RunWith(SpringRunner.class)
public class OrderServiceTest {
private OrderDAO orderDAO;
private ApplicationEventPublisher eventPublisher;
private OrderService orderService;
@Before
public void setUp() throws Exception {
orderDAO = mock(OrderDAO.class);
eventPublisher = mock(ApplicationEventPublisher.class);
orderService = new OrderService(
orderDAO,
eventPublisher
);
}
@Test
public void save() throws Exception { …Run Code Online (Sandbox Code Playgroud) 我正在使用spring boot 2.0.7 Release和spring-boot-starter-mail-2.0.7.Release.
我javaMailsender在班级内部自动装配在 Windows 上工作正常,同时尝试部署解决Unixbelwo 问题
APPLICATION FAILED TO START
***************************
Description:
Field javaMailSender in com.fti.di.capstock.tran.pub.email.SendEmail required a bean of type 'org.springframework.mail.javamail.JavaMailSender' that could not be found.
The injection point has the following annotations:
- @org.springframework.beans.factory.annotation.Autowired(required=true)
Action:
Consider defining a bean of type 'org.springframework.mail.javamail.JavaMailSender' in your configuration.
Run Code Online (Sandbox Code Playgroud)
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import com.fti.di.capstock.tran.pub.constants.ApplicationFlowConstants;
import com.fti.di.integration.constants.IntegrationConstants;
import com.fti.di.integration.util.StringUtil;
@Component("sendEmail")
public class SendEmail {
@Autowired
private JavaMailSender …Run Code Online (Sandbox Code Playgroud) 我有一个简单的 Spring 集成流程,它绑定到处理器以进行输入和输出。
我已经配置了 Kafka 绑定器来映射输入和输出主题。效果很好。
假设我想将 3 个 Kafka 主题绑定到输入,从而导致 3 个已配置的消费者从三个单独的 Kafka 主题中提取数据,然后由我的 SI 流进行处理。
是否可以将多个 Kafka 主题映射到我的处理器的输入?如果是这样,该配置会是什么样子?
spring-boot spring-cloud-stream spring-cloud-stream-binder-kafka
我已将 Producer 配置为 request.timeout.ms = 70,0000ms 和 retries=5。我怀疑这实际上是如何运作的,
在“request.timeout.ms=70,000”过期后,它会重试 5 次,或者在给定的“request.timeout.ms=70,000”内,它会以 retry.backoff.ms 值重试 5 次。
@SpringBootTest升级到 Spring Boot 3.0.3 时,当 aStreamBridge即将发送 contentType application/xml 的消息时,开始出现以下异常:
java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "result" is null
at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.doPostProcessResult(CloudEventsFunctionInvocationHelper.java:138)
at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:114)
at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:48)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:181)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:144)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:140)
at com.example.demo.DemoApplicationTests.shouldSendMessage(DemoApplicationTests.java:33)
...
Run Code Online (Sandbox Code Playgroud)
起初我认为这可能与整个应用程序中依赖版本的使用不一致有关,但是当我使用以下配置在 start.spring.io 创建一个新的 spring 应用程序时,问题仍然存在:
build.gradle(简化):
plugins {
id 'java'
id 'org.springframework.boot' version '3.0.3'
id 'io.spring.dependency-management' version '1.1.0'
}
ext {
set('springCloudVersion', "2022.0.1")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
}
Run Code Online (Sandbox Code Playgroud)
应用程序.yml:
spring:
cloud:
stream:
bindings:
demo-out-0: …Run Code Online (Sandbox Code Playgroud) rabbitmq ×4
spring ×4
spring-amqp ×4
spring-boot ×4
apache-kafka ×2
java ×2
send ×1
spring-cloud-stream-binder-kafka ×1
spring-kafka ×1