bad*_*era 5 ejb jms message-driven-bean amazon-sqs jakarta-ee
我们需要在 Java EE 应用程序中使用队列,因为它是一个基于云的应用程序(部署在 OpenShift Online 上),我们喜欢使用 amazon sqs。
如果我正确理解了 JMS/Java EE 接收部分的理论,一个@MessageDrivenbean 由 Java EE 容器管理,以便并行创建许多 bean 实例(根据最大池大小),如果传入消息的数量高。这对于处理高负载当然是一个很大的好处。
但是,我不知道如何在 Java EE 应用程序中以这种方式集成 aws sqs。我知道来自http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html的异步接收器示例:
class MyListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
// Cast the received message as TextMessage and print the text to screen.
if (message != null) {
System.out.println("Received: " + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
进而:
// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);
// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());
// Start receiving incoming messages.
connection.start();
Run Code Online (Sandbox Code Playgroud)
这是官方的异步接收器示例 - 它不是@MessageDrivenbean。很明显,我们需要某个地方的凭据进行身份验证(通过创建 SQSConnectionFactory,然后是连接,然后是会话 - 这也在示例中得到了很好的描述)。
但是我强烈认为这个例子不会并行处理消息——即只有一个 bean 实例正在处理队列,这对于可扩展的高负载应用程序来说不是一个好的解决方案。
a) 我们如何使用 Amazon SQS 走真正的 Java EE 之路?我只是找到了一棵春天的例子。但它必须是 Java EE 7。
b) 我们使用 Wildfly(目前是 8.2.1)。是否也可以让 Wildfly 在内部管理与 AWS 和应用程序的连接,我们可以像使用应用程序服务器管理的队列一样使用队列(与用于数据库访问的数据源相同的方法)?
从stdunbar得到答案后的结论:
这似乎不可能以“正确的方式”进行,这是我喜欢做的。所以我该怎么做?实现一个ManagedExecutorServiceas stdunbar描述来“包装”队列?- 然而,这意味着也有一个本地队列,这对于应用程序来说不是一个好情况,它应该是可扩展的?!什么是替代品?我们正在 OpenShift Online 上运行该应用程序。使用例如 ApacheMQ Cartridge 实例化自己的设备可能会更好......当然有很多缺点,比如成本,我们负责“基础设施”。
老实说,在这种情况下,我对 AWS 真的很失望......
我不认为我的解决方案是正确的 JAVA EE,但就我而言它是有效的。
配置:
@Singleton
public class SqsMessageManager
{
private Integer numberOfReceivers = 3;
public static SQSConnection connection = null;
public static Queue queue = null;
@Inject
SqsMessageReceiver sqsMessageReceiver;
public void init()
{
try
{
SQSConnectionFactory connectionFactory =
SQSConnectionFactory.builder()
.withRegion(Region.getRegion(Regions.EU_WEST_1))
.withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
.build();
connection = connectionFactory.createConnection();
queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");
for (int i = 0; i < numberOfReceivers; i++)
connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);
connection.start();
}
catch (JMSException e)
{
e.getStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
然后发件人:
@Dependent
public class SqsMessageSender
{
MessageProducer producer = null;
Session senderSession = null;
@PostConstruct
public void createProducer(){
try
{
// open new session and message producer
senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = senderSession.createProducer(SqsMessageManager.queue);
}catch(JMSException | NullPointerException e){
;
}
}
@PreDestroy
public void destroy(){
try
{
// close session
producer.close();
senderSession.close();
}catch(JMSException e){
}
}
// sends a message to aws sqs queue
public void sendMessage(String txt)
{
try
{
TextMessage textMessage = senderSession.createTextMessage(txt);
producer.send(textMessage);
}
catch (JMSException e)
{
e.getStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
和接收者:
@Dependent
public class SqsMessageReceiver implements MessageListener
{
public void onMessage(Message inMessage) {
...
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1659 次 |
| 最近记录: |