如何在 nifi 中为消息添加时间戳?

hey*_*you 0 apache-nifi

免责声明:我对nifi一无所知。

我需要从ListenHTTP处理器接收消息,然后将每条消息转换为带时间戳的 json 消息。

假设我hello world在凌晨 5 点收到消息。它应该将其转换为{"timestamp": "5 am", "message":"hello world"}.

我怎么做?

And*_*ndy 5

每个流文件都有属性,它们是存储在内存中的键/值对中的元数据片段(可用于快速读/写)。当任何操作发生时,NiFi 框架都会写入元数据片段,既写入与流文件相关的来源事件,有时也写入流文件本身。例如,如果ListenHTTP是流中的第一个处理器,则进入流的任何流文件都将具有一个属性,该entryDate属性的值是其发起时间的格式Thu Jan 24 15:53:52 PST 2019。您可以使用各种处理器(即 、 等)读取和写入这些UpdateAttribute属性RouteOnAttribute

对于您的用例,您可以让ReplaceText处理器紧随ListenHTTP搜索值(?s)(^.*$)(整个流文件内容,或“您通过 HTTP 调用收到的内容”)和替换值 的处理器{"timestamp_now":"${now():format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "timestamp_ed": "${entryDate:format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "message":"$1"}

上面的例子提供了两个选项:

  1. 这是流文件通过处理器entryDate产生的时间ListenHTTP
  2. now()函数获取自纪元以来的当前时间戳(以毫秒为单位)

根据性能/队列等,这两个值可能略有不同。在我的简单示例中,它们相隔 2 毫秒。您可以使用format()方法和正常的Java 时间格式语法来格式化它们,这样您就可以通过使用h a(完整示例now():format('h a'):toLower():)获得“5 am”。

例子

  • ListenHTTP9999在带有路径的端口上运行contentListener
  • ReplaceText如上
  • LogAttribute带日志负载true

画布和终端上的 NiFi 流程显示日志和curl命令

卷曲命令:curl -d "helloworld" -X POST http://localhost:9999/contentListener

输出示例:

2019-01-24 16:04:44,529 INFO [Timer-Driven Process Thread-6] o.a.n.processors.standard.LogAttribute LogAttribute[id=8246b0a0-0168-1000-7254-2c2e43d136a7] logging for flow file StandardFlowFileRecord[uuid=5e1c6d12-298d-4d9c-9fcb-108c208580fa,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1548374015429-1, container=default, section=1], offset=3424, length=122],offset=0,name=5e1c6d12-298d-4d9c-9fcb-108c208580fa,size=122]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
    Value: 'Thu Jan 24 16:04:44 PST 2019'
Key: 'lineageStartDate'
    Value: 'Thu Jan 24 16:04:44 PST 2019'
Key: 'fileSize'
    Value: '122'
FlowFile Attribute Map Content
Key: 'filename'
    Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
Key: 'path'
    Value: './'
Key: 'restlistener.remote.source.host'
    Value: '127.0.0.1'
Key: 'restlistener.remote.user.dn'
    Value: 'none'
Key: 'restlistener.request.uri'
    Value: '/contentListener'
Key: 'uuid'
    Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
--------------------------------------------------
{"timestamp_now":"2019-01-24 16:04:44.518 -0800", "timestamp_ed": "2019-01-24 16:04:44.516 -0800", "message":"helloworld"}
Run Code Online (Sandbox Code Playgroud)