我在3节点纱线群集上运行Structured Streaming(2.1.0)并将json记录流式传输到镶木地板.我的代码片段如下所示:
val query = ds.writeStream
.format("parquet")
.option("checkpointLocation", "/data/kafka_streaming.checkpoint")
.start("/data/kafka_streaming.parquet")
Run Code Online (Sandbox Code Playgroud)
我注意到它只为1,000条记录快速创建了数千个小文件.我怀疑它必须做触发频率.所以我改变了它:
val query = ds.writeStream
.format("parquet")
.option("checkpointLocation", "/data/kafka_streaming.checkpoint")
.**trigger(ProcessingTime("60 seconds"))**
.start("/data/kafka_streaming.parquet")
Run Code Online (Sandbox Code Playgroud)
差异非常明显.现在我可以看到为相同数量的记录创建的文件数量要少得多.
我的问题:是否有任何方法可以使触发器具有低延迟并保留较少数量的较大输出文件?
归档时间: |
|
查看次数: |
273 次 |
最近记录: |