Kay*_*ayV 9 java apache-camel batch-processing camel-jdbc camel-sql
我正在使用Apache Camel SQL批量插入过程.
我的应用程序是从Active MQ读取票据,其中包含大约2000张票据.
我已将批次更新为100.
我要解雇的查询如下:
sql.subs.insertCdr=
insert into subscription_logs(master_id,request_type,req_desc,msisdn,amount,status,resp_code,resp_desc,channel,transaction_id,se_mode,be_mode,sub_type,sub_timeleft,srv_name,srv_id,start_date,end_date,operator,circle,country,time_offset,retry_count,user_status,previous_state,se_reqrecvtime,se_respsenttime,be_reqsenttime,be_resprecvtime,cp_id,cp_name,sub_srvname,sub_srvid,msg_senderid,msg_text,call_back_url,call_back_resp,client_ip,se_sysIp,language,cp_callbackurlhittime,action,alert,notification_url,notification_resp)
values(:#masterId, :#requestType,:#reqDesc,:#msisdnCdr,:#price,:#status,:#responseCode,:#reason,:#channel,:#transactionId,:#seMode,:#beMode,:#subType,:#subTimeLeft,:#serviceName,:#serviceId,:#subStartDate,:#cdrEndDate,:#operator,:#circle,:#country,:#timeOffset,:#retryCount,:#userStatus,:#previousState,:#seReqRecvTime,:#seRespSentTime,:#beReqSentTime,:#beRespRecvTime,:#cpId,:#cpName,:#subServiceName,:#subServiceId,:#shortCode,:#message,:#callBackUrl,:#callBackResp,:#clientIp,:#seSysIp,:#language,:#cpCallbackUrlHitTime,:#action,:#alert,:#notificationUrl,:#notificationResponse)
SQL批处理路由定义如下:
<pipeline>
<log message="Going to insert in database"></log>
<transform>
<method ref="insertionBean" method="subsBatchInsertion"></method>
</transform>
<choice>
<when>
<simple>${in.header.subsCount} == ${properties:batch.size}</simple>
<to uri="sql:{{sql.subs.insertCdr}}?batch=true"></to>
<log message="Inserted rows ${body}"></log>
</when>
</choice>
</pipeline>
Run Code Online (Sandbox Code Playgroud)下面是我的java代码:
public List<Map<String, Object>> subsBatchInsertion(Exchange exchange) {
if (subsBatchCounter > batchSize) {
subsPayLoad.clear();
subsBatchCounter = 1;
}
subsPayLoad.add(generateInsert(exchange.getIn().getBody(SubscriptionCdr.class)));
exchange.getIn().setHeader("subsCount", subsBatchCounter);
subsBatchCounter++;
return subsPayLoad;
}
public Map<String, Object> generateInsert(Cdr cdr) {
Map<String, Object> insert = new HashMap<String, Object>();
try {
insert = BeanUtils.describe(cdr);
} catch (Exception e) {
Logger.sysLog(LogValues.error, this.getClass().getName()+" | "+Thread.currentThread().getStackTrace()[1].getMethodName(), coreException.GetStack(e));
}
for (String name : insert.keySet()) {
Logger.sysLog(LogValues.APP_DEBUG, this.getClass().getName(), name + ":"+ insert.get(name) + "\t");
}
return insert;
}
Run Code Online (Sandbox Code Playgroud)现在问题是当ActiveMQ中有大约120个票证时,SQL批处理应该已经开始将值插入到数据库中.但它需要花费更多的时间.当ActiveMQ中有大约500张票时,它开始插入过程.anyboody可以帮助优化插入过程吗?还是其他方法?
问题在于ActiceMQ的消费者数量.
当我将消费者数量更改回1时,批次会按时更新.
实际上,当消费者数量为10时,门票被并行消耗.这意味着,对于有10个消费者的activemq消费的100张票,每个消费者大约有10张票,因此增加了更多的时间.当任何一个消费者获得100张票时,该批次就会得到更新.
因此,将使用者计数更改为1会使所有票证由单个使用者处理,从而执行批量更新.
| 归档时间: |
|
| 查看次数: |
876 次 |
| 最近记录: |