我需要执行类似:sed'1d'simple.tsv> noHeader.tsv
这将从我的大流文件中删除第一行(> 1 GB).
问题是 - 我需要在我的流文件上执行它,所以它是:
sed'1d'myFlowFile> myFlowFile
问题是:我应该如何配置ExecuteStreamCommand处理器,以便它在我的流文件上运行命令并将其返回到我的流文件?如果sed不是最佳选择,我可以考虑采取其他方式(例如尾巴)
谢谢,米哈尔
编辑2(解决方案):
下面是最终的ExecuteStreamCommand配置,它可以完成我需要的操作(从流文件中删除第一行).@Andy - 非常感谢所有珍贵的提示.

米哈尔,
我想确保我正确理解你的问题,因为我认为有更好的解决方案.
问题:
你有一个1GB的TSV加载到NiFi,你想删除第一行.
解:
如果您的文件较小,最好的解决方案是使用ReplaceText具有以下处理器属性的处理器:
^.*\n <- empty stringThat would strip the first line out without having to send the 1GB content out of NiFi to the command-line and then re-ingest the results. Unfortunately, to use a regular expression, you need to set a Maximum Buffer Size, which means the entire contents need to be read into heap memory to perform this operation.
With a 1GB file, if you know the exact value of the first line, you should try ModifyBytes哪个允许您从流文件内容的开头和/或结尾修剪字节数.然后,您可以简单地指示处理器删除内容的前n个字节.由于NiFi的写时复制内容存储库,您仍然会有大约2GB的数据,但它使用8192B缓冲区大小以流方式进行.
我最好的建议是使用ExecuteScript处理器.该处理器允许您使用各种语言(Groovy,Python,Ruby,Lua,JS)编写自定义代码,并使其在流文件上执行.使用如下所示的Groovy脚本,您可以删除第一行并以流方式复制其余部分,这样就不会对堆产生不必要的负担.
我用1MB文件测试了这个,每个流文件花了大约1.06秒(MacBook Pro 2015,16 GB RAM,OS X 10.11.6).在更好的机器上,您显然可以获得更好的吞吐量,并且您可以将其扩展到更大的文件.
def flowfile = session.get()
if (!flowfile) return
try {
// Here we are reading from the current flowfile content and writing to the new content
flowfile = session.write(flowfile, { inputStream, outputStream ->
def bufferedReader = new BufferedReader(new InputStreamReader(inputStream))
// Ignoring the first line
def ignoredFirstLine = bufferedReader.readLine()
def bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
def line
int i = 0
// While the incoming line is not empty, write it to the outputStream
while ((line = bufferedReader.readLine()) != null) {
bufferedWriter.write(line)
bufferedWriter.newLine()
i++
}
// By default, INFO doesn't show in the logs and WARN will appear in the processor bulletins
log.warn("Wrote ${i} lines to output")
bufferedReader.close()
bufferedWriter.close()
} as StreamCallback)
session.transfer(flowfile, REL_SUCCESS)
} catch (Exception e) {
log.error(e)
session.transfer(flowfile, REL_FAILURE)
}
Run Code Online (Sandbox Code Playgroud)
一方面注意,一般来说,NiFi的一个好习惯是在可能的情况下将巨型文本文件拆分成较小的组件流文件(使用类似的东西SplitText)以获得并行处理的好处.如果1GB输入是视频,这将不适用,但正如您提到的TSV,我认为可能将初始流文件拆分为较小的部分并并行处理它们(甚至发送到集群中的其他节点)负载平衡)可以帮助您在这里的表现.
编辑:
我意识到我没有回答你原来的问题 - 如何将流文件的内容放入ExecuteStreamCommand处理器命令行调用中.如果要对属性的值进行操作,可以使用" 参数"字段中的" 表达式语言"语法引用属性值.但是,由于内容不能从EL引用,并且您不希望通过将1GB内容移动到属性来销毁堆,因此最好的解决方案是将内容写入文件,运行命令反对提供的文件名并将其写入另一个文件,然后用于将这些内容读回NiFi中的流文件. ${attribute_name}PutFilesedGetFile
编辑2:
这里有一个模板,它演示了使用ExecuteStreamCommand既rev和sed对flowfile内容,并把结果输出到新flowfile的内容.您可以运行流和监视器logs/nifi-app.log以查看输出或使用数据来源查询来检查每个处理器执行的修改.
| 归档时间: |
|
| 查看次数: |
6895 次 |
| 最近记录: |