小编Noa*_* Be的帖子

Python改变Tqdm栏样式

是否可以将 tqdm 栏从

\n
[Step 1]: 100%|\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88  | 109/109 [00:03<00:00, 32.46it/s]\n
Run Code Online (Sandbox Code Playgroud)\n

类似的东西

\n
[Step 1]: 100%[==========================>  ] 109/109 [00:03<00:00, 32.46it/s]\n
Run Code Online (Sandbox Code Playgroud)\n

python tqdm

6
推荐指数
2
解决办法
5996
查看次数

Spark 结构化流 writeStream 输出一个全局 csv

我目前正在使用 Spark 结构化流制作原始日志数据聚合器。

输入流由文本文件目录组成:

// == Input == //

val logsDF = spark.readStream
  .format("text")
  .option("maxFilesPerTrigger", 1)
  .load("input/*")
Run Code Online (Sandbox Code Playgroud)

然后解析日志...

// == Parsing == //

val logsDF2 = ...
Run Code Online (Sandbox Code Playgroud)

...并汇总

// == Aggregation == //

val windowedCounts = logsDF2
  .withWatermark("window_start", "15 minutes")
  .groupBy(
    col("window"),
    col("node")
  ).count()
Run Code Online (Sandbox Code Playgroud)

当我使用“控制台”接收器时,一切工作正常:结果在控制台中按浴批量更新:

// == Output == //

val query = windowedCounts.writeStream
  .format("console")
  .outputMode("complete")
  .start()
  .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

现在我想将结果保存在一个唯一的文件中(json、parquet、csv ..)

// == Output == //

val query = windowedCounts.writeStream
  .format("csv")
  .option("checkpointLocation", "checkpoint/")
  .start("output/")
  .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

但它向我输出 400 个空 csv ...我如何才能像在控制台中那样获得结果?

非常感谢 …

scala spark-streaming

4
推荐指数
1
解决办法
8314
查看次数

标签 统计

python ×1

scala ×1

spark-streaming ×1

tqdm ×1