Flink CsvTableSource 流

Jay*_*esh 4 apache-flink flink-streaming

我想流式传输一个 csv 文件并使用 flink 执行 sql 操作。但是我写的代码只读了一次就停止了。它不流。提前致谢,

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);

CsvTableSource csvtable = CsvTableSource.builder()
    .path("D:/employee.csv")
    .ignoreFirstLine()
    .fieldDelimiter(",")
    .field("id", Types.INT())
    .field("name", Types.STRING())
    .field("designation", Types.STRING())
    .field("age", Types.INT())
    .field("location", Types.STRING())
    .build();

tableEnv.registerTableSource("employee", csvtable);

Table table = tableEnv.scan("employee").where("name='jay'").select("id,name,location");
//Table table1 = tableEnv.scan("employee").where("age > 23").select("id,name,age,location");

DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);

//DataStream<Row> stream1 = tableEnv.toAppendStream(table1, Row.class);

stream.print();
//stream1.print();

env.execute();
Run Code Online (Sandbox Code Playgroud)

Fab*_*ske 5

CsvTableSource是基于FileInputFormat其读取并分析由行引用的文件一致。结果行被转发到流查询中。因此,CsvTableSource在连续读取和转发行的意义上,流是流式的。但是,CsvTableSource在文件末尾终止。因此,它发出一个有界流。

我假设您期望的行为是CsvTableSource读取文件直到结束,然后等待向文件追加写入。然而,这不是CsvTableSource工作方式。您需要为此实现自定义TableSource