pet*_*ott 4 scala apache-flink
我有一个由文件夹和文件构成的数据集.文件夹/文件结构本身对于数据分析很重要.
数据集的结构:
folder1
+-----file11
+-----column1
+-----column2
Run Code Online (Sandbox Code Playgroud)
每个文件都包含描述一个对象的数据.文件格式一致.它基本上是一个有两列的csv文件.这两列应该表示为结果对象中的元组序列.
文件的大小非常小.最高不超过20 kb.每个文件夹包含大约200个文件.
所需的输出对象应为:
{
a: "folder1", // name of parent folder
b: "file11", // name of content file
c: Seq[(String, String)] // content of file1
}
Run Code Online (Sandbox Code Playgroud)
如何处理Scala中此数据集的读取?
有两种方法可以解决这个问题:
a)如果文件夹中的数据非常小(少于几兆字节),您可以在本地进行读取并使用该ExecutionEnvironment.fromCollection()方法将数据带入Flink作业.
b)您创建自定义InputFormat.InputFormat允许解析自定义文件格式.在你的情况下,我会扩展TextInputFormat和覆盖readRecord()方法.此方法为文件中的每一行提供一个String.然后,您可以手动解析String中的数据,并将解析后的结果与Tuple3中的目录信息一起返回.您可以从filePath变量访问路径.对于使用FileInputFormats 递归读取文件,有recursive.file.enumeration配置值.