Bra*_*lle 6 jms amazon-sqs amazon-web-services spring-boot
我一直在开发一个从 Amazon SQS 接收消息的小型 Spring Boot 应用程序。但是我预见处理这些消息可能会失败,所以这就是为什么我认为添加死信队列是个好主意。
但是有一个问题:当处理失败时(我通过为某些消息抛出异常来强制执行)它不会在以后重新尝试并且不会移动到死信队列。我正在努力寻找问题,因为似乎没有太多信息。
但是,如果我查看Amazon 的文档,他们似乎能够做到,但不使用 Spring Boot 注释。有没有什么方法可以让下面的代码在不自己编写太多 JMS 代码的情况下工作?
这是我正在使用的当前配置。
@Configuration
public class AWSConfiguration {
@Value("${aws.sqs.endpoint}")
private String endpoint;
@Value("${aws.iam.key}")
private String iamKey;
@Value("${aws.iam.secret}")
private String iamSecret;
@Value("${aws.sqs.queue}")
private String queue;
@Bean
public JmsTemplate createJMSTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(getSQSConnectionFactory());
jmsTemplate.setDefaultDestinationName(queue);
jmsTemplate.setDeliveryPersistent(true);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
return jmsTemplate;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getSQSConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}
@Bean
public JmsTransactionManager jmsTransactionManager() {
return new JmsTransactionManager(getSQSConnectionFactory());
}
@Bean
public ConnectionFactory getSQSConnectionFactory() {
return SQSConnectionFactory.builder()
.withAWSCredentialsProvider(awsCredentialsProvider)
.withEndpoint(endpoint)
.withNumberOfMessagesToPrefetch(10).build();
}
private final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(iamKey, iamSecret);
}
@Override
public void refresh() {
}
};
}
Run Code Online (Sandbox Code Playgroud)
最后是接收端:
@Service
public class QueueReceiver {
private static final String EXPERIMENTAL_QUEUE = "${aws.sqs.queue}";
@JmsListener(destination = EXPERIMENTAL_QUEUE)
public void receiveSegment(String jsonSegment) throws IOException {
Segment segment = Segment.fromJSON(jsonSegment);
if(segment.shouldFail()) {
throw new IOException("This segment is expected to fail");
}
System.out.println(segment.getText());
}
}
Run Code Online (Sandbox Code Playgroud)
您可以通过利用Spring Cloud AWS极大地简化您的配置。
消息处理器
@Service
public class MessageHandler {
@SqsListener(value = "test-queue", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void queueListener(String msg, Acknowledgment acknowledgment){
System.out.println("message: " + msg);
if(/*successful*/){
acknowledgment.acknowledge();
}
}
}
Run Code Online (Sandbox Code Playgroud)
上面显示的示例是您接收消息所需的全部内容。这假设您已经创建了一个带有相关死信队列的 sqs 队列。如果您的消息未得到确认,则它们将再次重试,直到达到最大接收数。然后它会被转发到死信队列。
| 归档时间: |
|
| 查看次数: |
2353 次 |
| 最近记录: |