抛出异常和@JMSListener 时,SQS 消息可见性超时设置为 0

Rob*_*t P 10 java amazon-sqs jmstemplate spring-jms

我有一个简单的 Spring Boot 服务,它使用 JMSTemplate 侦听 AWS SQS 队列。当消息得到正确处理时,一切都按预期工作。

我正在使用 CLIENT_ACKNOWLEDGE,因此在处理过程中抛出异常时,会再次收到消息。但是,SQS 队列上的默认可见性超时设置将被忽略,并且会立即再次接收消息。

SQS 队列配置了 30 秒的默认可见性超时,并且在将消息放入 DLQ 之前接收的重新驱动策略为 20。

我已禁用该服务并使用 SQS 控制台来验证是否正确设置了默认可见性超时。我还尝试将 JMS 消息添加到方法签名并执行手动验证。

这是 JMS 配置的代码:

@Configuration
@EnableJms
class JmsConfig
{

    @Bean
    @Conditional(AWSEnvironmentCondition.class)
    public SQSConnectionFactory connectionFactory(@Value("${AWS_REGION}") String awsRegion)
    {
        return new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                                  .withRegion(Regions.fromName(awsRegion))
                                  .withCredentials(new DefaultAWSCredentialsProviderChain())
        );
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory)
    {
        DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setConcurrency("3-10");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setErrorHandler(defaultErrorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler defaultErrorHandler()
    {
        return new ErrorHandler()
        {
            @Override
            public void handleError(Throwable throwable)
            {
                LOG.error("JMS message listener error: {}", throwable.getMessage());
            }
        };
    }

    @Bean
    public JmsTemplate defaultJmsTemplate(ConnectionFactory connectionFactory)
    {
        return new JmsTemplate(connectionFactory);
    }
}
Run Code Online (Sandbox Code Playgroud)

这是侦听器的代码:

@Component
public class MessagingListener
{
    @Autowired
    private MessageService _messageService;

    @Autowired
    private Validator _validator;

    @JmsListener(destination = "myqueue")
    public void receiveMessage(String messageJson)
    {
        try
        {
            LOG.info("Received message");

            // The following line throws an IOException is the message is not JSON.
            MyMessage myMessage = MAPPER.readvalue(messageJson, MyMessage.class);

            Set<ConstraintViolation<MyMessage>> _validator.validate(myMessage);
            if (CollectionUtils.isNotEmpty(violations))
            {
                String errorMessage = violations.stream()
                        .map(v -> String.join(" : ", v.getPropertyPath().iterator().next().getName(),
                                v.getMessage()))
                LOG.error("Exception occurred while validating the model, details: {}", errorMessage)
                throw new ValidationException(errorMessage);
            }
        }
        catch (IOException e)
        {
            LOG.error("Error parsing message", e);
            throw new ValidationException("Error parsing message, details: " + e.getMessage());
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

当将带有无效 JSON 或未通过验证的 JSON 的消息放入 SQS 队列时,该消息会很快收到 20 次,然后最终出现在 DLQ 上。需要做什么才能遵守 SQS 中的默认可见性超时设置?

sed*_*ooe 6

如果出现异常,失败消息的可见性超时会通过ChangeMes​​sageVisibility设置为 0 ,因此即使队列具有不同的visibilityTimeout设置,SQS 也会立即发送此消息。

这是怎么发生的?

正如您在这里所看到的,Spring JMSAbstractMessageListenerContainer简单地执行了以下操作:

try {
    invokeListener(session, message); // This is your @JMSListener method
}
catch (JMSException | RuntimeException | Error ex) {
    rollbackOnExceptionIfNecessary(session, ex);
    throw ex;
}
commitIfNecessary(session, message);
Run Code Online (Sandbox Code Playgroud)

rollbackOnExceptionIfNecessary在方法上,将调用session.recover()因为:

  1. session.getTransacted()由于 SQS 不支持事务,因此始终为 false。看这里
  2. isClientAcknowledge(session)将返回 true,因为您正在使用 CLIENT_ACKNOWLEDGE 模式。

最后,SQSSession 的recover() 否定确认消息,这意味着visibilityTimeout将该特定消息设置为 0,导致 SQS 尝试立即发送该消息。

覆盖此行为的最简单方法是实现CustomJmsListenerContainerFactory&CustomMessageListenerContainer而不是使用DefaultJmsListenerContainerFactory& DefaultMessageListenerContainer

public class CustomMessageListenerContainer extends DefaultMessageListenerContainer {

    public CustomMessageListenerContainer() {
        super();
    }

    @Override
    protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) {
        // do nothing, so that "visibilityTimeout" will stay same
    }

}

public class CustomJmsListenerContainerFactory extends DefaultJmsListenerContainerFactory {
    
    @Override
    protected DefaultMessageListenerContainer createContainerInstance() {
        return new CustomMessageListenerContainer();
    }
}
Run Code Online (Sandbox Code Playgroud)

并使其成为 Spring bean,@Component或者像您在中所做的那样JmsConfig

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    // and set other stuff on factory
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

注意
如果您的应用程序使用 SQS 和 JMS 来使用其他类型的数据源,请确保为它们使用不同的 Container 和 ContainerFactory,以便其rollbackOnExceptionIfNecessary行为符合预期。