Camel Split 并行处理聚合消息

mad*_*nce 0 parallel-processing rest concurrency split apache-camel

可能看起来像一个微不足道的问题,但不幸的是无法让它工作

这是代码

from("direct:START")
.process( (ex) -> {
   List<Integer> pages = IntStream.range(1,5).boxed().collect(Collectors.toList());
   ex.getOut().setBody( pages );
 })
.split(body())
.parallelProcessing()
.to("http://someurl?page=${body}");
 --> Get the collective body here
Run Code Online (Sandbox Code Playgroud)

如何获得这份工作!

Bed*_*dla 6

您可以使用AggregatorcompletionSizeExpression 来聚合拆分的消息。

.split(body()).parallelProcessing().to("log:splitted_body_is_here")
.aggregate(constant(true), AggregationStrategies.groupedBody())
    .completionSize(exchangeProperty(Exchange.SPLIT_SIZE))
.to("log:aggregated_body_is_here")
Run Code Online (Sandbox Code Playgroud)

如果您使用旧版本的骆驼 (2.20.x) AggregationStrategies.groupedBody()将不可用。您可以使用其他方法。我使用了一个简单的自定义方法来执行我的聚合。

代码更改为

.split(body()).parallelProcessing().to("log:splitted_body_is_here")
.aggregate(constant(true), (in,out) ->{
                if( in == null ){
                    return out;
                }
                else{
                    String body = in.getIn().getBody(String.class);
                    body = body + "," + out.getIn().getBody( String.class );
                    in.getOut().setBody( body );
                    return in;
                }
            })
    .completionSize(exchangeProperty(Exchange.SPLIT_SIZE))
.to("log:aggregated_body_is_here")
Run Code Online (Sandbox Code Playgroud)

上面的代码只是假设主体是一个字符串/JSON 并用逗号附加它。

看起来,您想在to. 不支持,toD改用。