让 SQS 死信队列与 Spring Boot 和 JMS 一起使用

Bra*_*lle 6 jms amazon-sqs amazon-web-services spring-boot

我一直在开发一个从 Amazon SQS 接收消息的小型 Spring Boot 应用程序。但是我预见处理这些消息可能会失败,所以这就是为什么我认为添加死信队列是个好主意。

但是有一个问题:当处理失败时(我通过为某些消息抛出异常来强制执行)它不会在以后重新尝试并且不会移动到死信队列。我正在努力寻找问题,因为似乎没有太多信息。

但是,如果我查看Amazon 的文档,他们似乎能够做到,但不使用 Spring Boot 注释。有没有什么方法可以让下面的代码在不自己编写太多 JMS 代码的情况下工作?

这是我正在使用的当前配置。

@Configuration
public class AWSConfiguration {

    @Value("${aws.sqs.endpoint}")
    private String endpoint;

    @Value("${aws.iam.key}")
    private String iamKey;

    @Value("${aws.iam.secret}")
    private String iamSecret;

    @Value("${aws.sqs.queue}")
    private String queue;

    @Bean
    public JmsTemplate createJMSTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(getSQSConnectionFactory());
        jmsTemplate.setDefaultDestinationName(queue);
        jmsTemplate.setDeliveryPersistent(true);
        jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
        return jmsTemplate;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(getSQSConnectionFactory());
        factory.setConcurrency("1-1");
        return factory;
    }

    @Bean
    public JmsTransactionManager jmsTransactionManager() {
        return new JmsTransactionManager(getSQSConnectionFactory());
    }

    @Bean
    public ConnectionFactory getSQSConnectionFactory() {
        return SQSConnectionFactory.builder()
                .withAWSCredentialsProvider(awsCredentialsProvider)
                .withEndpoint(endpoint)
                .withNumberOfMessagesToPrefetch(10).build();
    }

    private final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() {
        @Override
        public AWSCredentials getCredentials() {
            return new BasicAWSCredentials(iamKey, iamSecret);
        }
        @Override
        public void refresh() {
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

最后是接收端:

@Service
public class QueueReceiver {

    private static final String EXPERIMENTAL_QUEUE = "${aws.sqs.queue}";

    @JmsListener(destination = EXPERIMENTAL_QUEUE)
    public void receiveSegment(String jsonSegment) throws IOException {
        Segment segment = Segment.fromJSON(jsonSegment);
        if(segment.shouldFail()) {
            throw new IOException("This segment is expected to fail");
        }
        System.out.println(segment.getText());
    }

}
Run Code Online (Sandbox Code Playgroud)

Kyl*_*son 6

春季云AWS

您可以通过利用Spring Cloud AWS极大地简化您的配置。

消息处理器

@Service
public class MessageHandler {

    @SqsListener(value = "test-queue", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
    public void queueListener(String msg, Acknowledgment acknowledgment){
        System.out.println("message: " + msg);
        if(/*successful*/){
            acknowledgment.acknowledge();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

上面显示的示例是您接收消息所需的全部内容。这假设您已经创建了一个带有相关死信队列的 sqs 队列。如果您的消息未得到确认,则它们将再次重试,直到达到最大接收数。然后它会被转发到死信队列。

  • 虽然这非常有效,但我最终使用了上面的方法。这允许我对此处使用的 SQS 和 ActiveMQ 使用相同的接收代码。 (2认同)