与 Play 框架一起使用的 JMS/ActiveMQ 异常

you*_*ans 1 java activemq-classic jms playframework

我正在开发一个使用 ActiveMQ 和 Play Framework v2.4.2(Java 版)向最终用户发送电子邮件的消息传递系统。我是 JMS/ActiveMQ 技术的新手。我刚刚在 ActiveMQ 站点上使用了这个 Hello World 示例作为起点。

我创建了一个如下的测试类来测试使用 Play Framework 运行 ActiveMQ,一切正常:

public class ActiveMQMailApp {

    public static void main(String[] args) throws Exception {
        setup();
        MailConsumer.initService();
        for (int i =0;i<11;i++) MailProducer.sendMail(fakeMail());
    }
    public static void setup(){
        FakeApplication fakeApplication = Helpers.fakeApplication();
        Helpers.start(fakeApplication);
    }

    private static Mail fakeMail() throws InterruptedException {
        Thread.sleep(1000);
        SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd  hh:mm:ss");
        return new Mail( "noreply@abc.com", "receiver@gmail.com", "A Test Email", "<html><body><p>Date: <b> "+sdf.format(new Date())+" </b></p></body></html>");
    }

}
Run Code Online (Sandbox Code Playgroud)

但是当我在主应用程序中使用这个确切的代码时,抛出了这个异常:

javax.jms.JMSException: Could not create Transport. Reason: java.lang.RuntimeException: Fatally failed to create SystemUsageorg/apache/activemq/protobuf/BufferInputStream
        at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
        at org.apache.activemq.ActiveMQConnectionFactory.createTransport(ActiveMQConnectionFactory.java:332)
        at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:345)
        at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:303)
        at org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:243)
        at ir.iais.salary.services.MailProducer.run(MailProducer.java:35)
Caused by: java.lang.RuntimeException: Fatally failed to create SystemUsageorg/apache/activemq/protobuf/BufferInputStream
        at org.apache.activemq.broker.BrokerService.getSystemUsage(BrokerService.java:1159)
        ... 5 more
Caused by: java.io.IOException: org/apache/activemq/protobuf/BufferInputStream
        at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:39)
        ... 11 more
Caused by: java.lang.NoClassDefFoundError: org/apache/activemq/protobuf/BufferInputStream
        at org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter.<init>(KahaDBPersistenceAdapter.java:65)
        ... 13 more
Caused by: java.lang.ClassNotFoundException: org.apache.activemq.protobuf.BufferInputStream
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
Run Code Online (Sandbox Code Playgroud)

我的 MailProducer 和 MailConsumer 类是这样的:

public class MailProducer implements Runnable{
    public static final String AMQ_MAIL_QUEUE = "MAIL";
    public static final String BROKER_URL = "vm://localhost?broker.useJmx=false&persistent=false";
    private Mail mail;

    public MailProducer(Mail mail) {
        this.mail = mail;
    }

    public static void sendMail(Mail mail){
        Thread brokerThread = new Thread(new MailProducer(mail));
        brokerThread.setDaemon(false);
        brokerThread.start();
    }

    @Override
    public void run() {
        try {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // Create a Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create the destination (Topic or Queue)
            Destination destination = session.createQueue(AMQ_MAIL_QUEUE);

            // Create a MessageProducer from the Session to the Topic or Queue
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // Create a messages
            TextMessage textMessage = session.createTextMessage(new Gson().toJson(mail));
            // Tell the producer to send the message
            System.out.println("Sent message: "+ new Gson().toJson(mail) + " : " + Thread.currentThread().getName());
            producer.send(textMessage);

            // Clean up
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}




public class MailConsumer implements Runnable, ExceptionListener {
    private static final Logger logger = getLogger(MailConsumer.class);
    private static Thread mailConsumerService;

    public static synchronized void initService() {
        MailConsumer mailConsumer = Play.application().injector().instanceOf(MailConsumer.class);
        if (mailConsumerService != null) {
            logger.info("STOPPING MailConsumer thread.");
            mailConsumerService.interrupt();
        }
        logger.info("Starting MailConsumer thread.");
        mailConsumerService = new Thread(mailConsumer);
        mailConsumerService.setDaemon(true);
        mailConsumerService.setName("MailConsumer Service");
        mailConsumerService.start();
        logger.info("MailConsumer thread started.");
    }

    @Inject
    private MailerClient mailerClient;

    @Override
    public void run() {
        try {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(MailProducer.BROKER_URL);

            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();

            connection.setExceptionListener(this);

            // Create a Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create the destination (Topic or Queue)
            Destination destination = session.createQueue(MailProducer.AMQ_MAIL_QUEUE);

            // Create a MessageConsumer from the Session to the Topic or Queue
            MessageConsumer consumer = session.createConsumer(destination);

            while (!Thread.currentThread().isInterrupted()) {
                // Wait for a message
                Message message = consumer.receive();

                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                    Mail mail = new Gson().fromJson(text, Mail.class);
                    Email email = new Email();
                    email.setFrom(mail.getFrom());
                    email.setTo(mail.getTo());
                    email.setSubject(mail.getSubject());
                    email.setBodyHtml(mail.getBodyHtml());
                    System.out.println("sending email...");
                    mailerClient.send(email);
                    System.out.println("email sent!");
                } else {
                    System.out.println("Received: " + message);
                    logger.info("message type: "+message.getClass().getSimpleName());
                }

            }
            logger.info("MailConsumer interrupted.");
            consumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                logger.info("MailConsumer thread interrupted.");
            } else {
                logger.error(e.getLocalizedMessage(), e);
            }
        }
    }

    public synchronized void onException(JMSException ex) {
        System.out.println("JMS Exception occured.  Shutting down client.");
        logger.error("ErrorCode=" + ex.getErrorCode() + " , " + ex.getMessage(), ex);
    }
}
Run Code Online (Sandbox Code Playgroud)

我在主应用程序中调用 MailProducer,如下所示:

public Result sendTestMail(){
    if(!DevStatus.gI().isInDebugMode()) return badRequest("You'r not in Development Env.");
    SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd  hh:mm:ss");
    Mail mail = new Mail("noreply@abc.com", "receiver@gmail.com", "A Test Email", "<html><body><p>Date: <b> " + sdf.format(new Date()) + " </b></p></body></html>");
    MailProducer.sendMail(mail);
    return ok("email sent! "+ sdf.format(new Date()));
Run Code Online (Sandbox Code Playgroud)

问题似乎org.apache.activemq.protobuf.BufferInputStream不在类路径中。我添加"org.apache.activemq.protobuf" % "activemq-protobuf" % "1.1"到 build.sbt 但没有任何改变。我还通过添加persistent=false到代理 URI 来禁用 ActiveMQ 持久性,但它不起作用。

我能做什么?将 ActiveMQ 与 Play Framework 一起用作 JMS 是否有意义?或者有一些更好的 JMS 可以与 Play Framework 一起使用?阿卡呢?!!

编辑:我的 ActiveMQ 相关部门是:

  "org.apache.activemq" % "activemq-broker" % "5.13.4",
  "org.apache.activemq" % "activemq-client" % "5.13.4",
  "org.apache.activemq" % "activemq-kahadb-store" % "5.13.4",
  "org.apache.activemq.protobuf" % "activemq-protobuf" % "1.1",
Run Code Online (Sandbox Code Playgroud)

编辑 2:我替换了上面的依赖项,"org.apache.activemq" % "activemq-all" % "5.14.0"主应用程序开始工作了!我最初以为问题已解决并且与 ActiveMQ 包有关,但我意识到 ActiveMQMailApp 测试类现在抛出与上述相同的异常!我在一个新的简单 maven 项目(不是 play 框架)中运行了这个测试类,一切正常!恐怕这个错误稍后会再次出现。究竟发生了什么?!

jum*_*ett 5

将此依赖项添加到您的 pom.xml 文件中。

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-kahadb-store</artifactId>
        <scope>runtime</scope>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

请参阅问题报告。