在 @MessageDriven bean 中使用 amazon sqs - 池化/并行处理

bad*_*era 5 ejb jms message-driven-bean amazon-sqs jakarta-ee

我们需要在 Java EE 应用程序中使用队列,因为它是一个基于云的应用程序(部署在 OpenShift Online 上),我们喜欢使用 amazon sqs。

如果我正确理解了 JMS/Java EE 接收部分的理论,一个@MessageDrivenbean 由 Java EE 容器管理,以便并行创建许多 bean 实例(根据最大池大小),如果传入消息的数量高。这对于处理高负载当然是一个很大的好处。

但是,我不知道如何在 Java EE 应用程序中以这种方式集成 aws sqs。我知道来自http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html的异步接收器示例:

class MyListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) {
                System.out.println("Received: " + ((TextMessage) message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

进而:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();
Run Code Online (Sandbox Code Playgroud)

这是官方的异步接收器示例 - 它不是@MessageDrivenbean。很明显,我们需要某个地方的凭据进行身份验证(通过创建 SQSConnectionFactory,然后是连接,然后是会话 - 这也在示例中得到了很好的描述)。
但是我强烈认为这个例子不会并行处理消息——即只有一个 bean 实例正在处理队列,这对于可扩展的高负载应用程序来说不是一个好的解决方案。

a) 我们如何使用 Amazon SQS 走真正的 Java EE 之路?我只是找到了一棵春天的例子。但它必须是 Java EE 7。

b) 我们使用 Wildfly(目前是 8.2.1)。是否也可以让 Wildfly 在内部管理与 AWS 和应用程序的连接,我们可以像使用应用程序服务器管理的队列一样使用队列(与用于数据库访问的数据源相同的方法)?

stdunbar得到答案后的结论
这似乎不可能以“正确的方式”进行,这是我喜欢做的。所以我该怎么做?实现一个ManagedExecutorServiceas stdunbar描述来“包装”队列?- 然而,这意味着也有一个本地队列,这对于应用程序来说不是一个好情况,它应该是可扩展的?!什么是替代品?我们正在 OpenShift Online 上运行该应用程序。使用例如 ApacheMQ Cartridge 实例化自己的设备可能会更好......当然有很多缺点,比如成本,我们负责“基础设施”。

老实说,在这种情况下,我对 AWS 真的很失望......

Sim*_*ach 4

我不认为我的解决方案是正确的 JAVA EE,但就我而言它是有效的。

配置:

@Singleton
public class SqsMessageManager
{
    private Integer numberOfReceivers = 3;

    public static SQSConnection connection = null;
    public static Queue queue = null;

    @Inject
    SqsMessageReceiver sqsMessageReceiver;

    public void init()
    {
        try
        {
            SQSConnectionFactory connectionFactory =
                    SQSConnectionFactory.builder()
                            .withRegion(Region.getRegion(Regions.EU_WEST_1))
                            .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
                            .build();

            connection = connectionFactory.createConnection();

            queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");

            for (int i = 0; i < numberOfReceivers; i++)
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);

            connection.start();
        }
        catch (JMSException e)
        {
            e.getStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后发件人:

@Dependent
public class SqsMessageSender
{
    MessageProducer producer = null;
    Session senderSession = null;

    @PostConstruct
    public void createProducer(){
        try
        {
            // open new session and message producer
            senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = senderSession.createProducer(SqsMessageManager.queue);
        }catch(JMSException | NullPointerException e){
            ;
        }
    }

    @PreDestroy
    public void destroy(){
        try
        {
            // close session
            producer.close();
            senderSession.close();
        }catch(JMSException e){

        }
    }

    // sends a message to aws sqs queue
    public void sendMessage(String txt)
    {
        try
        {
            TextMessage textMessage = senderSession.createTextMessage(txt);
            producer.send(textMessage);
        }
        catch (JMSException e)
        {
            e.getStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

和接收者:

@Dependent
public class SqsMessageReceiver implements MessageListener
{
    public void onMessage(Message inMessage) {
        ...
    }
}
Run Code Online (Sandbox Code Playgroud)