如何在nifi处理器中获取整个流程文件内容

Kri*_*Ram 1 apache-nifi

我正在使用nifi来开发数据漂移。在我的流程中使用 SelectHiveQL 处理器。selectHiveQL 的输出(flowFile)需要进入下一个处理器。什么是合适的处理器来获取 flowFile 内容并将其存储到用户定义的变量中,必须在 Executescript 中使用相同的变量来操作数据。

And*_*ndy 5

处理器ExecuteScript可以通过标准 API 直接访问传入流文件的内容。这是一个例子:

def flowFile = session.get();
if (flowFile == null) {
    return;
}

// This uses a closure acting as a StreamCallback to do the writing of the new content to the flowfile
flowFile = session.write(flowFile,
        { inputStream, outputStream ->
            String line

            // This code creates a buffered reader over the existing flowfile input
            final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))

            // For each line, write the reversed line to the output
            while (line = inReader.readLine()) {
                outputStream.write("${line.reverse()}\n".getBytes('UTF-8'))
            }
        } as StreamCallback)

flowFile = session?.putAttribute(flowFile, "reversed_lines", "true")
session.transfer(flowFile, /*ExecuteScript.*/ REL_SUCCESS)
Run Code Online (Sandbox Code Playgroud)

将流文件内容移动到属性是危险的,因为属性和内容内存在 NiFi 中的管理方式不同。Apache NiFi In Depth指南中对这些差异有更详细的解释。