我正在使用nifi来开发数据漂移。在我的流程中使用 SelectHiveQL 处理器。selectHiveQL 的输出(flowFile)需要进入下一个处理器。什么是合适的处理器来获取 flowFile 内容并将其存储到用户定义的变量中,必须在 Executescript 中使用相同的变量来操作数据。
处理器ExecuteScript可以通过标准 API 直接访问传入流文件的内容。这是一个例子:
def flowFile = session.get();
if (flowFile == null) {
return;
}
// This uses a closure acting as a StreamCallback to do the writing of the new content to the flowfile
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
String line
// This code creates a buffered reader over the existing flowfile input
final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
// For each line, write the reversed line to the output
while (line = inReader.readLine()) {
outputStream.write("${line.reverse()}\n".getBytes('UTF-8'))
}
} as StreamCallback)
flowFile = session?.putAttribute(flowFile, "reversed_lines", "true")
session.transfer(flowFile, /*ExecuteScript.*/ REL_SUCCESS)
Run Code Online (Sandbox Code Playgroud)
将流文件内容移动到属性是危险的,因为属性和内容内存在 NiFi 中的管理方式不同。Apache NiFi In Depth指南中对这些差异有更详细的解释。
| 归档时间: |
|
| 查看次数: |
11467 次 |
| 最近记录: |