Hum*_*gus 5 java jms oracle-aq
我有一个Oracle AQ,队列类型为SYS.AQ $ _JMS_TEXT_MESSAGE.我想要做的是从java应用程序中将文本插入到提到的队列中.
等效的SQL查询是
declare
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload SYS.AQ$_JMS_TEXT_MESSAGE;
begin
o_payload := sys.aq$_jms_text_message.construct;
o_payload.set_text(xmltype('<user>text</user>').getClobVal());
sys.dbms_aq.enqueue (
queue_name => 'QUEUE_NAME',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
commit;
end;
/
Run Code Online (Sandbox Code Playgroud)
我使用本指南得到了大部分权利,但我坚持了下来
o_payload := sys.aq$_jms_text_message.construct;
o_payload.set_text(xmltype('<user>text</user>').getClobVal());
Run Code Online (Sandbox Code Playgroud)
该指南显示了如何排队RAW消息,但我需要它是JMS,否则数据类型与队列类型不匹配.
任何帮助将不胜感激,因为即使有全能的谷歌我也无法找到解决这个问题的方法.有没有办法使用oracle.jdbc.aq
类来实现它,或者我只需要搞砸它并使用SQL查询?
Cha*_*ghe 10
只需复制粘贴此代码并尝试.(如果它适合你)然后仔细检查代码,并了解.
在执行时,
之后,
然后分别评论/取消注释每一行并试一试.
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsDestinationProperty;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
public class OracleAQClient {
public static QueueConnection getConnection() {
String hostname = "localhost";
String oracle_sid = "xe";
int portno = 1521;
String userName = "jmsuser";
String password = "jmsuser";
String driver = "thin";
QueueConnectionFactory QFac = null;
QueueConnection QCon = null;
try {
// get connection factory , not going through JNDI here
QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, portno, driver);
// create connection
QCon = QFac.createQueueConnection(userName, password);
} catch (Exception e) {
e.printStackTrace();
}
return QCon;
}
public static void createQueue(String user, String qTable, String queueName) {
try {
/* Create Queue Tables */
System.out.println("Creating Queue Table...");
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
AQQueueTableProperty qt_prop;
AQQueueTable q_table = null;
AQjmsDestinationProperty dest_prop;
Queue queue = null;
qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");
q_table = ((AQjmsSession) session).createQueueTable(user, qTable, qt_prop);
System.out.println("Qtable created");
dest_prop = new AQjmsDestinationProperty();
/* create a queue */
queue = ((AQjmsSession) session).createQueue(q_table, queueName, dest_prop);
System.out.println("Queue created");
/* start the queue */
((AQjmsDestination) queue).start(session, true, true);
} catch (Exception e) {
e.printStackTrace();
return;
}
}
public static void sendMessage(String user, String queueName,String message) {
try {
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QCon.start();
Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
MessageProducer producer = session.createProducer(queue);
TextMessage tMsg = session.createTextMessage(message);
//set properties to msg since axis2 needs this parameters to find the operation
tMsg.setStringProperty("SOAPAction", "getQuote");
producer.send(tMsg);
System.out.println("Sent message = " + tMsg.getText());
session.close();
producer.close();
QCon.close();
} catch (JMSException e) {
e.printStackTrace();
return;
}
}
public static void browseMessage(String user, String queueName) {
Queue queue;
try {
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QCon.start();
queue = ((AQjmsSession) session).getQueue(user, queueName);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enu = browser.getEnumeration();
List list = new ArrayList();
while (enu.hasMoreElements()) {
TextMessage message = (TextMessage) enu.nextElement();
list.add(message.getText());
}
for (int i = 0; i < list.size(); i++) {
System.out.println("Browsed msg " + list.get(i));
}
browser.close();
session.close();
QCon.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void consumeMessage(String user, String queueName) {
Queue queue;
try {
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QCon.start();
queue = ((AQjmsSession) session).getQueue(user, queueName);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("MESSAGE RECEIVED " + msg.getText());
consumer.close();
session.close();
QCon.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
String userName = "jmsuser";
String queue = "sample_aq";
String qTable = "sample_aqtbl";
//createQueue(userName, qTable, queue);
//sendMessage(userName, queue,"<user>text</user>");
//browseMessage(userName, queue);
//consumeMessage(userName, queue);
}
Run Code Online (Sandbox Code Playgroud)
}
您需要从oracle DB安装目录将这些jar/libs复制到java项目中
这篇文章应归功于Ratha [1].很少有东西要修改,我只修改了那些并提供了代码.
[1] http://wso2.com/library/tutorials/2011/11/configuring-wso2-esb-with-oracle-as-messaging-media/
谢谢