小编Mik*_*kov的帖子

Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降

社区!

请帮助我了解如何使用 Spark 获得更好的压缩率?

让我描述一下案例:

  1. 我有数据集,让我们把它的产品在其上的实木复合地板文件使用的编解码器使用Sqoop ImportTool进口HDFS瞬间。作为导入的结果,我有 100 个文件,总大小为46 GB,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数超过80 亿条,84 列

  2. 我也在使用snappy对 Spark 进行简单的读取/重新分区/写入,结果我得到:

~ 100 GB输出大小,具有相同的文件数、相同的编解码器、相同的数量和相同的列。

代码片段:

val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
Run Code Online (Sandbox Code Playgroud)
  1. 使用镶木地板工具,我查看了摄取和处理的随机文件,它们如下所示:

摄取:

creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1

row group 1:                    RC:3640100 TS:36454739 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY …
Run Code Online (Sandbox Code Playgroud)

snappy apache-spark parquet apache-spark-sql spark-dataframe

11
推荐指数
1
解决办法
1万
查看次数

Spark数据集/数据帧连接NULL倾斜键

使用 Spark Dataset/DataFrame 连接时,我面临长时间运行并因 OOM 作业而失败。

这是输入:

  • 约 10 个不同大小的数据集,大部分都很大(>1 TB)
  • 所有左连接到一个基础数据集
  • 一些连接键是null

经过一些分析,我发现失败和缓慢的作业原因是null偏斜键:当左侧有数百万条带有 join key 的记录时null

我采取了一些蛮力方法来解决这个问题,在这里我想分享一下。

如果您有更好的或任何内置的解决方案(对于常规 Apache Spark),请分享。

skew apache-spark apache-spark-sql

5
推荐指数
2
解决办法
3347
查看次数

Spark结构化流文件源开始偏移

有没有一种方法可以指定Spark结构化文件流源的起始偏移量?

我正在尝试从HDFS流式传输实木复合地板:

spark.sql("SET spark.sql.streaming.schemaInference=true")

spark.readStream
  .parquet("/tmp/streaming/")
  .writeStream
  .option("checkpointLocation", "/tmp/streaming-test/checkpoint")
  .format("parquet")
  .option("path", "/tmp/parquet-sink")
  .trigger(Trigger.ProcessingTime(1.minutes))
  .start()
Run Code Online (Sandbox Code Playgroud)

如我所见,第一个运行是处理路径中检测到的所有可用文件,然后将偏移量保存到检查点位置并仅处理新文件,即接受期限并且在所看到的文件中不存在映射。

我正在寻找一种方法,如何指定起始偏移量或时间戳或选项数量,以在首次运行时不处理所有可用文件。

我有找的方法吗?

apache-spark spark-streaming apache-spark-sql

3
推荐指数
1
解决办法
1181
查看次数

计算两个时区之间偏移的最简单方法 Scala/Java

我找不到如何在 Scala/Java 中轻松计算给定时间点的时区偏移的好答案。

鉴于:

  • 日期与时间
  • 时区 1
  • 时区 2

要求:计算时区 1 中特定日期的偏移量,需要将此日期移动到时区 2。

例子:

日期时间 = 2017-10-23 00:00:00 在America/Los_Angeles时区 1,

时区 2 = Europe/Paris

偏移量应等于 +9,它可用于将日期从时区 1 移动到时区 2。

java timezone scala java-time

3
推荐指数
1
解决办法
1934
查看次数