使用 Apache Nifi 读取电子邮件的消息正文

Cli*_*ash 3 email message apache-nifi

是否可以使用 Apache Nifi 以单步方式检索电子邮件的正文内容、电子邮件标题详细信息和电子邮件附件。

如果是这样,请帮助我如何实现这一目标。

mat*_*tyb 5

除非您编写自己的处理器或脚本(使用 ExecuteScript 或 InvokeScriptedProcessor),否则单步执行是不可能的。但是,可以在单个流中使用以下内容:

ConsumePOP3 -> ExtractEmailHeaders -> ExtractEmailAttachments -> ...

在上述流程的末尾,每个附件将有一个流程文件,每个流程文件都包含电子邮件标题作为属性,附件作为内容。


小智 5

您可以使用处理器“ExecuteScript”,而不是开发自定义处理器。

import email
import mimetypes
from email.parser import Parser
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from java.io import BufferedReader, InputStreamReader
from org.apache.nifi.processors.script import ExecuteScript
from org.apache.nifi.processor.io import InputStreamCallback
from org.apache.nifi.processor.io import StreamCallback

class PyInputStreamCallback(InputStreamCallback):
    _text = None

    def __init__(self):
        pass

    def getText(self) : 
        return self._text

    def process(self, inputStream):
        self._text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

flowFile = session.get()
if flowFile is not None :
    reader = PyInputStreamCallback()
    session.read(flowFile, reader)

    msg = email.message_from_string(reader.getText())
    body = ""

    if msg.is_multipart():
        for part in msg.walk():
            ctype = part.get_content_type()
            cdispo = str(part.get('Content-Disposition'))

            if ctype == 'text/plain' and 'attachment' not in cdispo:
                body = part.get_payload(decode=True)  # decode
                break
    else:
        body = msg.get_payload(decode=True)

    flowFile = session.putAttribute(flowFile, 'msgbody', body.decode('utf-8', 'ignore'))

    session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
Run Code Online (Sandbox Code Playgroud)

截屏

在此输入图像描述