从Apache Flink中的输入文件创建对象

pet*_*ott 4 scala apache-flink

我有一个由文件夹和文件构成的数据集.文件夹/文件结构本身对于数据分析很重要.

数据集的结构:

folder1
   +-----file11
            +-----column1
            +-----column2
Run Code Online (Sandbox Code Playgroud)

每个文件都包含描述一个对象的数据.文件格式一致.它基本上是一个有两列的csv文件.这两列应该表示为结果对象中的元组序列.

文件的大小非常小.最高不超过20 kb.每个文件夹包含大约200个文件.

所需的输出对象应为:

{
    a: "folder1",              // name of parent folder
    b: "file11",               // name of content file
    c: Seq[(String, String)]   // content of file1
}
Run Code Online (Sandbox Code Playgroud)

如何处理Scala中此数据集的读取?

Rob*_*ger 6

有两种方法可以解决这个问题:

a)如果文件夹中的数据非常小(少于几兆字节),您可以在本地进行读取并使用该ExecutionEnvironment.fromCollection()方法将数据带入Flink作业.

b)您创建自定义InputFormat.InputFormat允许解析自定义文件格式.在你的情况下,我会扩展TextInputFormat和覆盖readRecord()方法.此方法为文件中的每一行提供一个String.然后,您可以手动解析String中的数据,并将解析后的结果与Tuple3中的目录信息一起返回.您可以从filePath变量访问路径.对于使用FileInputFormats 递归读取文件,有recursive.file.enumeration配置值.