使用 Spring Boot 监听消息队列 SQS 不适用于标准配置

Mar*_*ryo 4 java amazon-sqs spring-boot spring-messaging spring-cloud-aws

我无法使用 Spring Boot 和 SQS 创建工程队列侦听器(消息已发送并显示在 SQS ui 中)

或不起作用@MessageMapping@SqsListener

Java:11
Spring Boot:2.1.7
依赖项:spring-cloud-aws-messaging

这是我的配置

@Configuration
@EnableSqs
public class SqsConfig {

    @Value("#{'${env.name:DEV}'}")
    private String envName;

    @Value("${cloud.aws.region.static}")
    private String region;

    @Value("${cloud.aws.credentials.access-key}")
    private String awsAccessKey;

    @Value("${cloud.aws.credentials.secret-key}")
    private String awsSecretKey;

    @Bean
    public Headers headers() {
        return new Headers();
    }

    @Bean
    public MessageQueue queueMessagingSqs(Headers headers,
                                          QueueMessagingTemplate queueMessagingTemplate) {
        Sqs queue = new Sqs();
        queue.setQueueMessagingTemplate(queueMessagingTemplate);
        queue.setHeaders(headers);
        return queue;
    }

    private ResourceIdResolver getResourceIdResolver() {
        return queueName -> envName + "-" + queueName;
    }

    @Bean
    public DestinationResolver destinationResolver(AmazonSQSAsync amazonSQSAsync) {
        DynamicQueueUrlDestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(
                amazonSQSAsync,
                getResourceIdResolver());
        destinationResolver.setAutoCreate(true);
        return destinationResolver;
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync,
                                                         DestinationResolver destinationResolver) {
        return new QueueMessagingTemplate(amazonSQSAsync, destinationResolver, null);
    }

    @Bean
    public QueueMessageHandlerFactory queueMessageHandlerFactory() {
        QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
        MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
        messageConverter.setStrictContentTypeMatch(false);
        factory.setArgumentResolvers(Collections.singletonList(new PayloadArgumentResolver(messageConverter)));
        return factory;
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSqs);
        factory.setMaxNumberOfMessages(10);
        factory.setWaitTimeOut(2);
        return factory;
    }

}
Run Code Online (Sandbox Code Playgroud)

我也注意到了这一点org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactoryorg.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration在启动时运行

还有我的测试

@RunWith(SpringJUnit4ClassRunner.class)
public class ListenTest {

    @Autowired
    private MessageQueue queue;

    private final String queueName = "test-queue-receive";

    private String result = null;

    @Test
    public void test_listen() {
        // given
        String data = "abc";

        // when
        queue.send(queueName, data).join();

        // then
        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> Objects.nonNull(result));

        Assertions.assertThat(result).equals(data);
    }

    @MessageMapping(value = queueName)
    public void receive(String data) {
        this.result = data;
    }
}

Run Code Online (Sandbox Code Playgroud)

你觉得有什么不对吗?

我创建一个存储库,例如:( https://github.com/mmaryo/java-sqs-test )
在测试文件夹中,更改“application.yml”中的 aws 凭证
,然后运行测试

Nie*_*tor 5

我在使用 spring-cloud-aws-messaging 包时遇到了同样的问题,但后来我在 @SqsListener 注释中使用了队列 URL 而不是队列名称,并且它起作用了。

@SqsListener(value = { "https://full-queue-URL" }, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receive(String message) {
     // do something
}
Run Code Online (Sandbox Code Playgroud)

看来您可以在使用 spring-cloud-starter-aws-messaging 包时使用队列名称。我相信如果您不想使用入门包,有一些配置允许使用队列名称而不是 URL。

编辑:我注意到该区域被默认为 us-west-2,尽管我在属性文件中列出了 us-east-1 。然后我创建了一个 RegionProvider bean 并将区域设置为 us-east-1 ,现在当我在 @SqsMessaging 中使用队列名称时,它会被找到并正确解析为框架代码中的 URL。

  • 您可以发布 RegionProvider @Bean 方法的片段吗? (2认同)