我们正在使用Flink 1.2.0和建议的S3AFileSystem配置.当源是S3存储桶中的单个文件夹时,简单的流式传输作业按预期工作.
该作业运行没有错误-但并没有产生输出-当它的来源是它本身包含子文件夹的文件夹.
为清楚起见,下面是S3存储桶的模型.运行作业以指向s3a://bucket/folder/2017/04/25/01/正确读取存储在存储桶中的所有三个对象和任何后续对象.将作业指向s3a://bucket/folder/2017/(或任何其他中间文件夹)会导致作业无法生成任何内容.
在绝望中,我们尝试了[in | ex]包含尾随的排列/.
.
`-- folder
`-- 2017
`-- 04
|-- 25
| |-- 00
| | |-- a.txt
| | `-- b.txt
| `-- 01
| |-- c.txt
| |-- d.txt
| `-- e.txt
`-- 26
Run Code Online (Sandbox Code Playgroud)
工作代码:
def main(args: Array[String]) {
val parameters = ParameterTool.fromArgs(args)
val bucket = parameters.get("bucket")
val folder = parameters.get("folder")
val path = s"s3a://$bucket/$folder"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines: DataStream[String] = env.readFile(
inputFormat …Run Code Online (Sandbox Code Playgroud)