如何在apache nifi(jython)中使用executescript根据条件生成属性?

xav*_*xav 2 apache-nifi

我正在使用 apache nifi 在 kafka 中存储日志。

我有一些日志行,这取决于我必须将它们发送到主题 kafka 或其他主题的内容。我的问题是有很多主题,因此我必须使用很多处理器。

我认为使用“executescript”,我可以根据日志文本中的条件生成一个动态属性,我可以在publishkafka 处理器的“主题名称”属性中使用该属性。

我从代码开始读取flowfile的内容,以及写一些条件,但不知道如何生成属性。

日志行的一些示例:Jim,18,M,156,Oregon,USA 等 John,55,M,170,Idaho,USA 等

这是我到目前为止所拥有的:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self):
         pass
    def process(self, inputStream, outputStream):
         Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
         TextLog = str(Log).split(',')
         name = TextLog[0]
         age = TextLog[1]
         sex = TextLog[2]

         if name == 'John' and age == '30':
             Topic_A = str(TextLog) 
             outputStream.write(bytearray((Topic_A).encode('utf-8')))
         elif name == 'Max' and age == '25':
             Topic_B = str(TextLog)
             outputStream.write(bytearray((Topic_B).encode('utf-8')))
         elif name == 'Smith' and age == '10' or '20':
             Topic_C = str(TextLog)
             outputStream.write(bytearray((Topic_C).encode('utf-8')))
Run Code Online (Sandbox Code Playgroud)

目标是拥有一个单一的 executescript 处理器和一个单一的 kafka 处理器。拜托,有人可以帮助我。

Bry*_*nde 5

简短的回答是:

flowFile = session.putAttribute(flowFile, 'my-property', 'my-value')
Run Code Online (Sandbox Code Playgroud)

https://funnifi.blogspot.com/2016/02/executescript-processor-hello-world.html

无需编写任何自定义代码即可完成此操作的典型方法是使用 RouteOnContent ...

您可以添加用户定义的属性,其中名称将成为关系,值将成为正则表达式。

例如,添加两个属性:

john = John,30,.*
max = Max,25,.*
Run Code Online (Sandbox Code Playgroud)

从那里,您将每个关系发送到设置主题名称的 UpdateAttribute 处理器,因此 john 将转到设置主题 = Topic_A 的 UpdateAttribute,而 max 将转到设置主题 = Topic_B 的 UpdateAttribute。

然后他们将全部连接到主题设置为 ${topic} 的单个 PublishKafka。