小编Ste*_*hPH的帖子

使用S3AFileSystem的Flink不会从S3读取子文件夹

我们正在使用Flink 1.2.0和建议的S3AFileSystem配置.当源是S3存储桶中的单个文件夹时,简单的流式传输作业按预期工作.

该作业运行没有错误-但并没有产生输出-当它的来源是它本身包含子文件夹的文件夹.

为清楚起见,下面是S3存储桶的模型.运行作业以指向s3a://bucket/folder/2017/04/25/01/正确读取存储在存储桶中的所有三个对象和任何后续对象.将作业指向s3a://bucket/folder/2017/(或任何其他中间文件夹)会导致作业无法生成任何内容.

在绝望中,我们尝试了[in | ex]包含尾随的排列/.

.
`-- folder
    `-- 2017
        `-- 04
            |-- 25
            |   |-- 00
            |   |   |-- a.txt
            |   |   `-- b.txt
            |   `-- 01
            |       |-- c.txt
            |       |-- d.txt
            |       `-- e.txt
            `-- 26
Run Code Online (Sandbox Code Playgroud)

工作代码:

def main(args: Array[String]) {

  val parameters = ParameterTool.fromArgs(args)
  val bucket = parameters.get("bucket")
  val folder = parameters.get("folder")

  val path = s"s3a://$bucket/$folder"

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val lines: DataStream[String] = env.readFile(
    inputFormat …
Run Code Online (Sandbox Code Playgroud)

hadoop amazon-s3 apache-flink flink-streaming

2
推荐指数
1
解决办法
1104
查看次数

标签 统计

amazon-s3 ×1

apache-flink ×1

flink-streaming ×1

hadoop ×1