社区!
请帮助我了解如何使用 Spark 获得更好的压缩率?
让我描述一下案例:
我有数据集,让我们把它的产品在其上的实木复合地板文件使用的编解码器使用Sqoop ImportTool进口HDFS瞬间。作为导入的结果,我有 100 个文件,总大小为46 GB,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数超过80 亿条,有84 列
我也在使用snappy对 Spark 进行简单的读取/重新分区/写入,结果我得到:
~ 100 GB输出大小,具有相同的文件数、相同的编解码器、相同的数量和相同的列。
代码片段:
val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
Run Code Online (Sandbox Code Playgroud)
摄取:
creator: parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber})
extra: parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"
and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1
row group 1: RC:3640100 TS:36454739 OFFSET:4
AVAILABLE: INT64 SNAPPY …Run Code Online (Sandbox Code Playgroud) snappy apache-spark parquet apache-spark-sql spark-dataframe
使用 Spark Dataset/DataFrame 连接时,我面临长时间运行并因 OOM 作业而失败。
这是输入:
null经过一些分析,我发现失败和缓慢的作业原因是null偏斜键:当左侧有数百万条带有 join key 的记录时null。
我采取了一些蛮力方法来解决这个问题,在这里我想分享一下。
如果您有更好的或任何内置的解决方案(对于常规 Apache Spark),请分享。
有没有一种方法可以指定Spark结构化文件流源的起始偏移量?
我正在尝试从HDFS流式传输实木复合地板:
spark.sql("SET spark.sql.streaming.schemaInference=true")
spark.readStream
.parquet("/tmp/streaming/")
.writeStream
.option("checkpointLocation", "/tmp/streaming-test/checkpoint")
.format("parquet")
.option("path", "/tmp/parquet-sink")
.trigger(Trigger.ProcessingTime(1.minutes))
.start()
Run Code Online (Sandbox Code Playgroud)
如我所见,第一个运行是处理路径中检测到的所有可用文件,然后将偏移量保存到检查点位置并仅处理新文件,即接受期限并且在所看到的文件中不存在映射。
我正在寻找一种方法,如何指定起始偏移量或时间戳或选项数量,以在首次运行时不处理所有可用文件。
我有找的方法吗?
我找不到如何在 Scala/Java 中轻松计算给定时间点的时区偏移的好答案。
鉴于:
要求:计算时区 1 中特定日期的偏移量,需要将此日期移动到时区 2。
例子:
日期时间 = 2017-10-23 00:00:00 在America/Los_Angeles时区 1,
时区 2 = Europe/Paris
偏移量应等于 +9,它可用于将日期从时区 1 移动到时区 2。