cav*_*llo 5 java asynchronous listener amazon-sqs spring-jms
我的团队正在尝试为Amazon SQS Java 扩展客户端库实现异步侦听器,因为如本链接和本链接所述,短期内不会提供任何支持。
最初的问题是我们需要通过SQS发送非常大的有效负载。因此,为了解决这个问题,我们使用了前面提到的 Amazon SQS Java 扩展客户端库功能。它会自动将大量有效负载上传到 S3,然后将它们作为消息检索,而消费者无需担心如何从 S3 检索它们。遗憾的是,这只能开箱即用地同步完成。
因此,为了使其异步工作,我们使用JmsListener编写了以下代码。它起作用了,我们获得了 String 格式的序列化对象。
AWS/SQS Bean
import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
import com.amazon.sqs.javamessaging.ProviderConfiguration;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@Configuration
public class AWSConfig {
@Bean
public AWSCredentialsProvider awsCredentialsProvider() {
return new DefaultAWSCredentialsProviderChain();
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
final DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
jmsListenerContainerFactory.setConnectionFactory(connectionFactory);
return jmsListenerContainerFactory;
}
@Bean
public ConnectionFactory connectionFactory(final AmazonS3Client amazonS3Client, final AmazonSQSAsync amazonSqs) {
final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration()
.withLargePayloadSupportEnabled(amazonS3Client, "my-bucket-name");
final ProviderConfiguration providerConfiguration = new ProviderConfiguration();
providerConfiguration.setNumberOfMessagesToPrefetch(10);
final SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(providerConfiguration,
new AmazonSQSExtendedClient(amazonSqs, extendedClientConfig));
return sqsConnectionFactory
}
}
Run Code Online (Sandbox Code Playgroud)
使用
@JmsListener注解的监听器:
@Component
public class QueueListener {
...
@JmsListener(destination = "my-queue-name", containerFactory = "jmsListenerContainerFactory")
public void consumeMessage(String queueMessage) {
System.out.println(queueMessage);
}
}
Run Code Online (Sandbox Code Playgroud)
这里的问题是,虽然它可以工作,但从 S3 读取的大消息以String. 我们真的非常非常喜欢将消息自动解析为特定对象(就像@SqsListener已经允许我们这样做,而无需我们自己解析字符串),如下所示:
@JmsListener(destination = "my-queue-name", containerFactory = "jmsListenerContainerFactory")
public void consumeMessage(SomeObject queueMessage) ...
Run Code Online (Sandbox Code Playgroud)
已经有一个类似的问题,但答案的链接已失效,并且描述对我们没有任何帮助。而且此 JMS 消息传递指南无法与 SQS 扩展库很好地结合使用。让事情变得更困难的是,我们只能org.springframework.jms.support.converter在将转换器设置为 时使用这些类jmsListenerContainerFactory,因此我们不能使用 AWS 使用的相同 Jackson 映射器。