我正在使用 apache nifi 在 kafka 中存储日志。
我有一些日志行,这取决于我必须将它们发送到主题 kafka 或其他主题的内容。我的问题是有很多主题,因此我必须使用很多处理器。
我认为使用“executescript”,我可以根据日志文本中的条件生成一个动态属性,我可以在publishkafka 处理器的“主题名称”属性中使用该属性。
我从代码开始读取flowfile的内容,以及写一些条件,但不知道如何生成属性。
日志行的一些示例:Jim,18,M,156,Oregon,USA 等 John,55,M,170,Idaho,USA 等
这是我到目前为止所拥有的:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
TextLog = str(Log).split(',')
name = TextLog[0]
age = TextLog[1]
sex = TextLog[2]
if name == 'John' and age == '30':
Topic_A = str(TextLog)
outputStream.write(bytearray((Topic_A).encode('utf-8')))
elif name == 'Max' and age == '25':
Topic_B = str(TextLog)
outputStream.write(bytearray((Topic_B).encode('utf-8')))
elif name == 'Smith' and age == '10' or '20':
Topic_C = str(TextLog)
outputStream.write(bytearray((Topic_C).encode('utf-8')))
Run Code Online (Sandbox Code Playgroud)
目标是拥有一个单一的 executescript 处理器和一个单一的 kafka 处理器。拜托,有人可以帮助我。
简短的回答是:
flowFile = session.putAttribute(flowFile, 'my-property', 'my-value')
Run Code Online (Sandbox Code Playgroud)
https://funnifi.blogspot.com/2016/02/executescript-processor-hello-world.html
无需编写任何自定义代码即可完成此操作的典型方法是使用 RouteOnContent ...
您可以添加用户定义的属性,其中名称将成为关系,值将成为正则表达式。
例如,添加两个属性:
john = John,30,.*
max = Max,25,.*
Run Code Online (Sandbox Code Playgroud)
从那里,您将每个关系发送到设置主题名称的 UpdateAttribute 处理器,因此 john 将转到设置主题 = Topic_A 的 UpdateAttribute,而 max 将转到设置主题 = Topic_B 的 UpdateAttribute。
然后他们将全部连接到主题设置为 ${topic} 的单个 PublishKafka。
归档时间: |
|
查看次数: |
452 次 |
最近记录: |