Sel*_*gam 4 json apache-camel jbossfuse
我有一个骆驼应用程序,它从一个大小为 13000 的 jms 队列接收一个 json 数组请求,json 数组请求的结构如下。我想以 5 个为一组流式传输和拆分 json 数组。例如,如果我收到一个大小为 100 的 json 数组,我想将其分组为 5 个并将其拆分为 20 个请求。是否有内置的骆驼功能来分组和拆分 json 数组,还是我需要编写自定义拆分器?
我正在使用骆驼 2.17 版本。
示例 json 数组:
[{
"name": "Ram",
"email": "ram@gmail.com",
"age": 23
}, {
"name": "Shyam",
"email": "shyam23@gmail.com",
"age": 28
}, {
"name": "John",
"email": "john@gmail.com",
"age": 33
}, {
"name": "Bob",
"email": "bob32@gmail.com",
"age": 41
}, {
"name": "test1",
"email": "test1@gmail.com",
"age": 41
}, {
"name": "test2",
"email": "test2@gmail.com",
"age": 41
}, {
"name": "test3",
"email": "test3@gmail.com",
"age": 41
}, {
"name": "test4",
"email": "test4@gmail.com",
"age": 41
}]
Run Code Online (Sandbox Code Playgroud)
你可以尝试这样的事情:
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.split().jsonpath("$")
.streaming()
.aggregate(AggregationStrategies.groupedExchange())
.constant("true")
.completionSize(5)
.completionTimeout(1000)
.log("${body}")
.to("mock:result");
}
};
}
Run Code Online (Sandbox Code Playgroud)
如果消息的大小不是 5 的倍数,则路由应在聚合前等待 1 秒并继续。使用您的输入,结果将是分别包含 5 个和 3 个项目的两条消息:
INFO 5419 --- [ main] route1 : List<Exchange>(5 elements)
INFO 5419 --- [eTimeoutChecker] route1 : List<Exchange>(3 elements)
Run Code Online (Sandbox Code Playgroud)
可以在此处查看完整示例。
编辑:
根据要求,一个 Spring DSL 示例:
<camel:route>
<camel:from uri="direct:start" />
<camel:split streaming="true">
<camel:jsonpath>$</camel:jsonpath>
<camel:aggregate completionSize="5"
completionTimeout="1000" groupExchanges="true">
<camel:correlationExpression>
<camel:constant>true</camel:constant>
</camel:correlationExpression>
<camel:log message="${body}"></camel:log>
<camel:to uri="mock:result"></camel:to>
</camel:aggregate>
</camel:split>
</camel:route>
Run Code Online (Sandbox Code Playgroud)
这会起作用
@Autowired
@EndpointInject(uri = "direct://splitted-queue")
ProducerTemplate producerTemplate;
@Component
class Router extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct://direct-queue").split(ExpressionBuilder.languageExpression("jsonpath","$")).convertBodyTo(String.class).process(new Processor() {
List<String> jsons = new ArrayList<>();
@Override
public void process(Exchange exchange) throws Exception {
jsons.add(exchange.getIn().getBody().toString());
if(jsons.size() == 5) {
producerTemplate.sendBody(jsons);
jsons.clear();
}
}
});
}
Run Code Online (Sandbox Code Playgroud)
为此,您需要camel-jsonpath依赖项
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jsonpath</artifactId>
<version>2.19.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8609 次 |
| 最近记录: |