从 Apache camel Route 中的分页 API 读取

tec*_*ver 2 rest pagination apache-camel

如何使用 Apache Camel DSL 从分页 REST API 端点或从一次获取“K”项/记录的 JDBC SQL 查询中读取?感谢是否有相同的干净示例。

提前致谢。

tec*_*ver 5

我已经使用 loopDoWhile dsl 做到了这一点:

from("direct:start").loopDoWhile(stopLoopPredicate())
                        .to("bean:restAPIProcessor")
                        .to("bean:dataEnricherBean")
                     .end();
Run Code Online (Sandbox Code Playgroud)

stopLoopPredicate() 在这里:

public Predicate stopLoopPredicate() {
        Predicate stopLoop = new Predicate() {
            @Override
            public boolean matches(Exchange exchange) {
                return exchange.getIn().getBody() != null && !exchange.getIn().getBody().toString().equalsIgnoreCase("stopLoop");
            }
        };
        return stopLoop;
    }
Run Code Online (Sandbox Code Playgroud)

restAPIProcessor 是 Processor 的一个实现,其中进行了 REST API 调用。

处理分页的逻辑在 restAPIProcessor 中实现,当实际的 REST API 返回空响应时,“stopLoop”被设置为交换路由的主体。这很好用。这是 RestAPIProcessor 的代码:

public class RestAPIProcessor implements Processor {

    @Inject
    private RestTemplate restTemplate;

    private static final int LIMIT = 100;

    private static final String REST_API = "<REST API URL>";

    @Override
    public void process(Exchange exchange) throws Exception {
        Integer offset = (Integer) exchange.getIn().getHeader("offset");
        Integer count = (Integer) exchange.getIn().getHeader("count");

        if (offset == null) offset = 0;
        if (count == null) count = 0;

        String response = "";
        Map<String,Object> body = new LinkedHashMap<>();
        body.put("offset",offset++);
        body.put("limit",LIMIT);
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<?> entity = new HttpEntity<Object>(body,headers);
        ResponseEntity<String> countResponseEntity = restTemplate.exchange(REST_API, HttpMethod.POST,entity,String.class);
        response = countResponseEntity.getBody();
        count += LIMIT;
        if (response == null || response.isEmpty()) {
            exchange.getIn().setBody("stopLoop");
            exchange.getOut().setHeaders(exchange.getIn().getHeaders());
        } else {
            exchange.getIn().setHeader("count", count);
            exchange.getIn().setHeader("offset", offset);
            exchange.getOut().setHeaders(exchange.getIn().getHeaders());
            exchange.getOut().setBody(response);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)