writeAsCSV()和writeAsText()是意外的

Spy*_*nos 2 scala writetofile apache-flink

我通过Scala API使用apache flink,在某些时候我获得了一个 DataSet[(Int, Int, Int)].使用方法的结果 writeAsCSV()writeAsText()出乎意料.它创建了一个目录.该目录具有位置和名称方法调用的第一个参数(例如) filePath .在该目录中,两个文件以名称"1"和"2"出现.在这些文件中,我可以看到DataSet数据.他们似乎将DataSets内容划分为这两个文件.尝试重新创建此行为以显示更简洁的代码片段,我不能.那就是我目睹了在预期位置创建了一个具有预期名称的文件而没有创建目录.val mas = ma_ groupBy(0,1)sum(2)mas.writeAsCsv("c:\ flink\mas.csv")

导致创建名为"mas.csv"的目录,并在其中创建两个文件"1"和"2".什么时候发生这样的事情?使用flink 9.1本地模式,Windows 7,scala 2.10,eclipse3.0.3

Mat*_*Sax 5

这是预期的行为.如果要获取单个输出文件,则需要将接收器的并行度设置为1.

dataset = dataset.writeAsCsv("filename").setParallelism(1);
Run Code Online (Sandbox Code Playgroud)

对于DataStream API,您需要插入一个额外的rebalane()来打破运算符链.否则,整个链将以dop = 1执行或setParallelism()可能被忽略.

datastream = datastream.rebalance().writeAsCsv("filename").setParallelism(1);
Run Code Online (Sandbox Code Playgroud)

  • 只有DataStream API才需要`rebalance`.如果在DataSet API中更改运算符的DOP,则数据将自动在后续运算符之间分配. (2认同)