用例是我有一个zip包含许多csv文件的文件。然后将每个文件中的每一行发送到seda队列进行处理。我遇到的问题是我想知道何时每行都由seda队列以执行其他工作。我不确定该如何处理。当前,我正在调查使用轮询来测试seda队列何时为空,但是如果处理的行比到达的行快,这可能会产生错误的结果。
我有一个可解压缩zip文件并以读取每个文件的类InputStream。然后,将文件中的每一行发送到生产者,然后再将其发送到seda队列。
@Component
public class CsvProcessor {
@Resource(name = "csvLineProducer")
ProducerTemplate producer;
public void process(InputStream flatFileStream) throws IOException {
if (flatFileStream==null) return;
try {
LineIterator it = IOUtils.lineIterator(flatFileStream, "UTF-8");
while (it.hasNext()) {
final String recordLine = it.nextLine();
this.producer.send(new Processor() {
public void process(Exchange outExchange) {
outExchange.getIn().setBody(recordLine);
}
});
}
} finally {
IOUtils.closeQuietly(flatFileStream);
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是骆驼的配置。
<camelContext xmlns="http://camel.apache.org/schema/spring" trace="true">
<template id="csvLineProducer" defaultEndpoint="seda:flatRecordStream"/>
...
<route>
<from uri="seda:flatRecordStream" />
<bean ref="myProcessor" method="processLine" />
</route>
...
</camelContext>
Run Code Online (Sandbox Code Playgroud)