小编Gar*_*ell的帖子

在RabbitMQ中手动确认消息

以前我正在读取队列中存在的所有消息,但现在我必须根据用户选择(计数)返回特定数量的消息.

我尝试相应地更改for循环,但由于自动确认,它读取所有消息.所以我尝试在配置文件中将其更改为手动.

在我的程序中如何在阅读msg后手动确认消息(目前我正在使用AmqpTemplate接收并且我没有频道参考)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());

            valueList.add(value);
            messageCount--;
        }
}
Run Code Online (Sandbox Code Playgroud)

任何帮助都非常值得感谢,提前谢谢.

rabbitmq spring-amqp spring-rabbitmq

3
推荐指数
1
解决办法
5807
查看次数

Spring amqp:如何在MessageListenerAdapter中读取MessageProperties

如果我在其签名中添加MessageProperties,handleMessage方法不会从队列中获取消息.如果没有MessageProperties,它可以正常工作.

如何在MessageListenerAdapter的handleMessage中获取MessageProperties?

public class EventMessageAdapter {

  public void handleMessage(MessageProperties messageProperties, Event event)    {
  ...
  String id = messageProperties.getHeaders().get("key");
}
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-amqp spring-rabbitmq

3
推荐指数
1
解决办法
2794
查看次数

spring ampq注解驱动一个队列两个监听器区分路由key

事实上我不明白这个运行。也许我误解了一些东西,无论如何这是不可能的。我正在尝试在同一个队列、相同的交换器上配置 2 个侦听器,但只有路由键应该不同。我的问题是事情不知何故变得混乱。结果是侦听器 A 收到了侦听器 B 的消息。但只是有时,有时一切正常。有什么建议么?

我的配置

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHostname());
    connectionFactory.setUsername(getUsername());
    connectionFactory.setPassword(getPassword());
    return connectionFactory;
}

@Bean
public RabbitAdmin rabbitAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setMessageConverter(new CustomMessageConverter());
    factory.setConnectionFactory(connectionFactory());
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    factory.setConcurrentConsumers(10);
    factory.setMaxConcurrentConsumers(10);
    return factory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(new MappingJackson2MessageConverter());
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

我的听众A

@RabbitListener(bindings = @QueueBinding(value = @Queue(value …
Run Code Online (Sandbox Code Playgroud)

java rabbitmq spring-annotations spring-amqp

3
推荐指数
1
解决办法
1760
查看次数

SpringAMQP RabbitMQ如何在没有Exchange的情况下直接发送到Queue

我正在使用带有Rabbit模板的SpringAMQP.如何将邮件直接发送到省略Exchange的队列?我该怎么做?

spring send rabbitmq spring-amqp spring-rabbitmq

3
推荐指数
1
解决办法
2811
查看次数

即使Kafka监听器(spring-kafka)没有初始化,如何启动spring应用程序

我正在使用一个使用spring-kafka的Kafka监听器的应用程序.我面临的问题是当Kafka监听器没有打开时弹簧上下文初始化失败(各种原因,例如Kafka服务器没有打开或关闭).如何确保我的应用程序是独立的.

谁能请帮忙.

spring apache-kafka spring-kafka

3
推荐指数
1
解决办法
1659
查看次数

Spring - 使用 TransactionSynchronizationManager 的测试方法

我在我的服务中使用TransactionSynchronizationManager.registerSynchronization(...),但无法为此方法编写测试,因为我收到异常:java.lang.IllegalStateException: Transaction synchronization is not active

我的服务的实施:

@Transactional
public void save(Order order) {
    log.info("Starting transaction...");
    orderDAO.save(order);
    TransactionSynchronizationManager.registerSynchronization(
            new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    log.info("#afterCommit");
                    eventPublisher.publishEvent(new OrderPlacedEvent(order));
                }
            }
    );
    log.info("Commit");
}
Run Code Online (Sandbox Code Playgroud)

并测试哪个不起作用:

@RunWith(SpringRunner.class)
public class OrderServiceTest {

    private OrderDAO orderDAO;
    private ApplicationEventPublisher eventPublisher;
    private OrderService orderService;

    @Before
    public void setUp() throws Exception {
        orderDAO = mock(OrderDAO.class);
        eventPublisher = mock(ApplicationEventPublisher.class);
        orderService = new OrderService(
                orderDAO,
                eventPublisher
        );
    }

    @Test
    public void save() throws Exception { …
Run Code Online (Sandbox Code Playgroud)

java spring spring-transactions

3
推荐指数
1
解决办法
7285
查看次数

找不到类型为“org.springframework.mail.javamail.JavaMailSender”的bean

我正在使用spring boot 2.0.7 Releasespring-boot-starter-mail-2.0.7.Release.

javaMailsender在班级内部自动装配在 Windows 上工作正常,同时尝试部署解决Unixbelwo 问题


APPLICATION FAILED TO START
***************************

Description:

Field javaMailSender in com.fti.di.capstock.tran.pub.email.SendEmail required a bean of type 'org.springframework.mail.javamail.JavaMailSender' that could not be found.

The injection point has the following annotations:
        - @org.springframework.beans.factory.annotation.Autowired(required=true)


Action:

Consider defining a bean of type 'org.springframework.mail.javamail.JavaMailSender' in your configuration.
Run Code Online (Sandbox Code Playgroud)
    import org.springframework.mail.javamail.JavaMailSender;
    import org.springframework.mail.javamail.MimeMessageHelper;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessagingException;
    import org.springframework.stereotype.Component;

    import com.fti.di.capstock.tran.pub.constants.ApplicationFlowConstants;
    import com.fti.di.integration.constants.IntegrationConstants;
    import com.fti.di.integration.util.StringUtil;

    @Component("sendEmail")
    public class SendEmail {

        @Autowired
        private JavaMailSender …
Run Code Online (Sandbox Code Playgroud)

spring-boot

3
推荐指数
3
解决办法
6861
查看次数

是否可以在 Spring Cloud Stream 中配置多个绑定到同一个处理器?

我有一个简单的 Spring 集成流程,它绑定到处理器以进行输入和输出。

我已经配置了 Kafka 绑定器来映射输入和输出主题。效果很好。

假设我想将 3 个 Kafka 主题绑定到输入,从而导致 3 个已配置的消费者从三个单独的 Kafka 主题中提取数据,然后由我的 SI 流进行处理。

是否可以将多个 Kafka 主题映射到我的处理器的输入?如果是这样,该配置会是什么样子?

spring-boot spring-cloud-stream spring-cloud-stream-binder-kafka

3
推荐指数
1
解决办法
2189
查看次数

Kafka 重试如何与 request.timeout 配合使用?

我已将 Producer 配置为 request.timeout.ms = 70,0000ms 和 retries=5。我怀疑这实际上是如何运作的,

在“request.timeout.ms=70,000”过期后,它会重试 5 次,或者在给定的“request.timeout.ms=70,000”内,它会以 retry.backoff.ms 值重试 5 次。

apache-kafka spring-boot kafka-producer-api

3
推荐指数
1
解决办法
4139
查看次数

StreamBridge:升级到 Spring Boot 3 时处理 XML 消息时无法调用“Object.getClass()”,因为“结果”为 null

@SpringBootTest升级到 Spring Boot 3.0.3 时,当 aStreamBridge即将发送 contentType application/xml 的消息时,开始出现以下异常:

java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "result" is null
at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.doPostProcessResult(CloudEventsFunctionInvocationHelper.java:138)
at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:114)
at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:48)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:181)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:144)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:140)
at com.example.demo.DemoApplicationTests.shouldSendMessage(DemoApplicationTests.java:33)
...
Run Code Online (Sandbox Code Playgroud)

起初我认为这可能与整个应用程序中依赖版本的使用不一致有关,但是当我使用以下配置在 start.spring.io 创建一个新的 spring 应用程序时,问题仍然存在:

build.gradle(简化):

plugins {
  id 'java'
  id 'org.springframework.boot' version '3.0.3'
  id 'io.spring.dependency-management' version '1.1.0'
}

ext {
  set('springCloudVersion', "2022.0.1")
}


dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-amqp'
  implementation 'org.springframework.cloud:spring-cloud-stream'
  implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.amqp:spring-rabbit-test'
  testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
}
Run Code Online (Sandbox Code Playgroud)

应用程序.yml:

spring:
  cloud:
    stream:
      bindings:
        demo-out-0: …
Run Code Online (Sandbox Code Playgroud)

spring spring-rabbit spring-boot spring-cloud-stream

2
推荐指数
1
解决办法
2110
查看次数