par*_*nza 10 groovy json apache-nifi
我正在使用Groovy脚本上的Apache NiFi 0.5.1来将传入的Json值替换为映射文件中包含的值.映射文件看起来像这样(它是一个简单的.txt):
Header1;Header2;Header3
 A;some text;A2
我从以下开始:
import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import java.nio.charset.StandardCharsets 
def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 
flowFile = session.write(flowFile, 
        { inputStream, outputStream -> 
            def content = """ 
{ 
  "field1": "A"
  "field2": "A", 
  "field3": "A" 
}""" 
            def slurped = new JsonSlurper().parseText(content) 
            def builder = new JsonBuilder(slurped) 
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 
        } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
这第一步工作得很好,虽然它是硬编码的,但远非理想.我最初的想法是使用ReplaceTextWithMapping来执行替换,但是它不适用于复杂的映射文件(例如多列).我想更进一步,但我不知道如何去做.首先,我想读取传入的流文件,而不是传入整个编码的JSON.这怎么可能在NiFi?在将脚本作为ExecuteScript的一部分运行之前,我通过UpdateAttribute输出带有内容的.Json文件,其中filename = myResultingJSON.json.此外,我知道如何使用Groovy(String mappingContent= new File('/path/to/file').getText('UTF-8')加载.txt文件,但是如何使用加载的文件来执行替换,以便我生成的JSON看起来像这样:
{ 
  "field1": "A"
  "field2": "some text", 
  "field3": "A2" 
}
谢谢您的帮助,
一世.
编辑:
对脚本的第一次修改允许我从InputStream中读取:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
    return;
}
flowFile = session.write(flowFile,
        { inputStream, outputStream ->
            def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)
            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
然后,我转而使用ConfigSlurper测试该方法,并在将逻辑注入Groovy ExecuteScript之前编写了一个泛型类:
class TestLoadingMappings {
    static void main(String[] args) {
        def content = '''
         {"field2":"A",
         "field3": "A"
         }
         '''
        println "This is the content of the JSON file" + content
        def slurped = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurped)
        println "This is the content of my builder " + builder
        def propertiesFile = new File("D:\\myFile.txt")
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        def config = new ConfigSlurper().parse(props).flatten()
        println "This is the content of my config " + config
        config.each { k, v ->
            if (builder[k]) {
                builder[k] = v
            }
        }
        println(builder.toPrettyString())
    }
}
我返回一个groovy.lang.MissinPropertyException,这是因为映射不是那么简单.所有字段/属性(从field1到field3)都进入具有相同值的InpuStream(例如),这意味着每次field2具有该值时,您可以确定它对其他两个属性有效.但是,我不能有一个映射"field2":"someText"的映射字段,因为实际的映射是由映射文件中的第一个值驱动的.这是一个例子:
{ 
      "field1": "A"
      "field2": "A", 
      "field3": "A" 
 }
在我的映射文件中,我有:
A;some text;A2
但是,如果您愿意,field1需要映射到A(文件中的第一个值)或保持不变.Field2需要映射到最后一列(A2)中的值,最后Field3需要映射到中间列中的"some text".
你能帮帮忙吗?这是我用Groovy和ExecuteScript可以实现的.如果需要,我可以将配置文件拆分为两个.
另外,我已经快速浏览了另一个选项(PutDistributedMapCache),我不确定我是否已经了解如何将键值对加载到分布式地图缓存中.看起来你需要有一个DistributedMapCacheClient,我不确定这是否易于实现.
谢谢!
编辑2:
其他一些进展,我现在的映射工作,但不知道为什么它在读取属性文件的第二行时失败:
"A" someText
"A2" anotherText
class TestLoadingMappings {
    static void main(String[] args) {
        def content = '''
         {"field2":"A",
         "field3":"A"
         }
         '''
        println "This is the content of the JSON file" + content
        def slurper = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurper)
        println "This is the content of my builder " + builder
        assert builder.content.field2 == "A"
        assert builder.content.field3 == "A"
        def propertiesFile = new File('D:\\myTest.txt')
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        println "This is the content of the properties " + props
        def config = new ConfigSlurper().parse(props).flatten()
        config.each { k, v ->
            if (builder.content.field2) {
                builder.content.field2 = config[k]
            }
            if (builder.content.field3) {
                builder.content.field3 = config[k]
            }
            println(builder.toPrettyString())
            println "This is my builder " + builder
        }
    }
}
我回来了: This is my builder {"field2":"someText","field3":"someText"}
知道为什么吗?
非常感谢
编辑3(从下面移动)
我写了以下内容:
    import groovy.json.JsonBuilder
    import groovy.json.JsonSlurper
    class TestLoadingMappings {
        static void main(String[] args) {
            def content =
            '''
            {"field2":"A",
             "field3":"A"
            }
            '''
            def slurper = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurper)
            println "This is the content of my builder " + builder
            def propertiesFile = new File('D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten()
            conf.each { k, v ->
            if (builder.content[k]) {
                builder.content[k] = v
            }
            println("This prints the resulting JSON :" + builder.toPrettyString())
        }
    }
}
但是,我必须更改映射文件的结构,如下所示:
"field1"="substitutionText"
"field2"="substitutionText2"
然后我将ConfigSlurper"合并"到ExecuteScript脚本中,如下所示:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
    return;
}
flowFile = session.write(flowFile,
        { inputStream, outputStream ->
            def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
            def propertiesFile = new File(''D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten();
            conf.each { k, v ->
                if (builder.content[k]) {
                    builder.content[k] = v
                }
            }
            outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
问题似乎是我无法通过使用类似于在TestLoadingMappings中创建的逻辑来复制原始映射文件中的逻辑.正如我在之前的评论/编辑中所提到的,映射应该以这种方式工作:
field2 =如果A则替换为"some text"
field3 =如果A则替换为A2
...
field2 = B然后替换为"其他一些文本"
field3 = B然后替换为B2
和儿子.
简而言之,映射由InputStream中的传入值(变化)驱动,有条件地根据JSON属性映射到不同的值.你能否推荐一种更好的方法来通过Groovy/ExecuteScript实现这种映射?我可以灵活地修改映射文件,你能看到我可以改变它以实现所需映射的方法吗?
谢谢
mat*_*tyb 11
我有一些关于如何读取包含JSON的流文件的示例:
http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html http: //funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html
你上面有正确的结构; 基本上你可以在闭包中使用那个"inputStream"变量来读取传入的流文件内容.如果你想一次性读取它(你可能需要为JSON做),你可以使用IOUtils.toString()后跟一个JsonSlurper,就像上面链接中的例子中所做的那样.
对于您的映射文件,特别是如果您的JSON是"平面",您可以拥有Java属性文件,将字段名称映射到新值:
field2 =一些文字
字段3 = A2
查看ConfigSlurper以读取属性文件.
一旦你输入传入的JSON文件并读入映射文件,就可以使用数组表示法而不是直接成员表示法来获取JSON的各个字段.因此,假设我将属性读入ConfigSlurper,并且我想用属性文件中的那个覆盖输入JSON中的任何现有属性(示例中称为"json").这可能如下所示:
config.parse(props).flatten().each { k,v ->
  if(json[k]) {
    json[k] = v
  }
}
然后,您可以继续使用outputStream.write().
您也可以通过PutDistributedMapCache处理器将其加载到分布式缓存中,而不是从文件中读取映射.你可以在ExecuteScript中读取DistributedCacheMapServer,我在这里有一个例子:
http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html
如果映射很复杂,您可能需要使用TransformJSON处理器,该处理器将在下一版NiFi(0.7.0)中提供.相关的Jira案例在这里:
https://issues.apache.org/jira/browse/NIFI-361
编辑:
为了回应您的编辑,我没有意识到您对各种值有多个规则.在这种情况下,属性文件可能不是表示映射的最佳方式.相反,你可以使用JSON:
{
  "field2": {
         "A": "some text",
         "B": "some other text"
       },
  "field3": {
         "A": "A2",
         "B": "B2"
       }
}
然后,您可以使用JSONSlurper读入映射文件.以下是使用上述映射文件的示例:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
    return;
}
def mappingJson = new File('/Users/mburgess/mappings.json').text
flowFile = session.write(flowFile, { inputStream, outputStream ->
    def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    def inJson = new JsonSlurper().parseText(content)
    def mappings = new JsonSlurper().parseText(mappingJson)
    inJson.each {k,v -> 
        inJson[k] = mappings[k][v]
    }
    outputStream.write(inJson.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
| 归档时间: | 
 | 
| 查看次数: | 5685 次 | 
| 最近记录: |