我正在使用Apache Camel SQL批量插入过程.
我的应用程序是从Active MQ读取票据,其中包含大约2000张票据.
我已将批次更新为100.
我要解雇的查询如下:
sql.subs.insertCdr=
insert into subscription_logs(master_id,request_type,req_desc,msisdn,amount,status,resp_code,resp_desc,channel,transaction_id,se_mode,be_mode,sub_type,sub_timeleft,srv_name,srv_id,start_date,end_date,operator,circle,country,time_offset,retry_count,user_status,previous_state,se_reqrecvtime,se_respsenttime,be_reqsenttime,be_resprecvtime,cp_id,cp_name,sub_srvname,sub_srvid,msg_senderid,msg_text,call_back_url,call_back_resp,client_ip,se_sysIp,language,cp_callbackurlhittime,action,alert,notification_url,notification_resp)
values(:#masterId, :#requestType,:#reqDesc,:#msisdnCdr,:#price,:#status,:#responseCode,:#reason,:#channel,:#transactionId,:#seMode,:#beMode,:#subType,:#subTimeLeft,:#serviceName,:#serviceId,:#subStartDate,:#cdrEndDate,:#operator,:#circle,:#country,:#timeOffset,:#retryCount,:#userStatus,:#previousState,:#seReqRecvTime,:#seRespSentTime,:#beReqSentTime,:#beRespRecvTime,:#cpId,:#cpName,:#subServiceName,:#subServiceId,:#shortCode,:#message,:#callBackUrl,:#callBackResp,:#clientIp,:#seSysIp,:#language,:#cpCallbackUrlHitTime,:#action,:#alert,:#notificationUrl,:#notificationResponse)
SQL批处理路由定义如下:
<pipeline>
<log message="Going to insert in database"></log>
<transform>
<method ref="insertionBean" method="subsBatchInsertion"></method>
</transform>
<choice>
<when>
<simple>${in.header.subsCount} == ${properties:batch.size}</simple>
<to uri="sql:{{sql.subs.insertCdr}}?batch=true"></to>
<log message="Inserted rows ${body}"></log>
</when>
</choice>
</pipeline>
Run Code Online (Sandbox Code Playgroud)下面是我的java代码:
public List<Map<String, Object>> subsBatchInsertion(Exchange exchange) {
if (subsBatchCounter > batchSize) {
subsPayLoad.clear();
subsBatchCounter = 1;
}
subsPayLoad.add(generateInsert(exchange.getIn().getBody(SubscriptionCdr.class)));
exchange.getIn().setHeader("subsCount", subsBatchCounter);
subsBatchCounter++;
return subsPayLoad;
}
public Map<String, Object> generateInsert(Cdr cdr) {
Map<String, Object> insert = new HashMap<String, Object>();
try {
insert …Run Code Online (Sandbox Code Playgroud)我正在尝试使用属性 header.CamelSqlRowCount 打印由 sql 组件选择的行数。但是,该值即将为空。我添加了另一篇文章中提到的跟踪来检查标题值,并且只打印了breadcrumbId。谁能告诉我我是否做错了什么?
骆驼路线
<endpoint id="sqlComponent" uri="sql:${sql.pollerQuery}?dataSource=DataSource&consumer.delay=60000&consumer.useIterator=false"/>
<route id="root">
<from ref="sqlComponent"/>
<log message="Received ${header.CamelSqlRowCount} records from the poller query"/>
<log message="Message Body= ${body}"/>
</route>
Run Code Online (Sandbox Code Playgroud)
记录的跟踪如下
DEBUG 2017-11-16 18:01:00 [Camel (camel-1) thread #0 - sql://SELECT%20ID%20FROM%20TABLE%20WHERE%20PUBLISHED%20=%20'N'] Executing query: SELECT ID FROM TABLE WHERE PUBLISHED = 'N'
INFO 2017-11-16 18:01:01 [Camel (camel-1) thread #0 - sql://SELECT%20ID%20FROM%20TABLE%20WHERE%20PUBLISHED%20=%20'N'] ID-MyMachine >>> (root) from(sql://SELECT%20ID%20FROM%20TABLE%20WHERE%20PUBLISHED%20=%20'N'?consumer.delay=60000&consumer.useIterator=false&dataSource=odsDataSource) --> log[Received ${header.CamelSqlRowCount} records from the poller query] <<< Pattern:InOnly, Headers:{breadcrumbId=ID-MyMachine}, BodyType:java.util.ArrayList, Body:[{ID=1}, {ID=2}, {ID=3}, {ID=4}, {ID=5}]
INFO 2017-11-16 18:01:01 [Camel …Run Code Online (Sandbox Code Playgroud) 我正在使用 Apache Camel 3.0,解析文件时出现此错误:
from("file:" + filePath)
.routeId("create-ticket")
.unmarshal(ticketCsv)
.to("jpa:Ticket")
.log("Created new ticket with id ${body.id}")
.convertBodyTo(String.class)
.setProperty("fileName", simple("${header.CamelFileAbsolutePath}"))
.setProperty("fileContents", body())
.to("sql:INSERT INTO DOCUMENT(document_name, document_contents) VALUES(:#${property.fileName}, :#${property.fileContents})");
Run Code Online (Sandbox Code Playgroud)
例外:
org.apache.camel.language.simple.types.SimpleIllegalSyntaxException:未知函数:property.fileName位于位置0 ${property.fileName}