小编ast*_*asz的帖子

为什么Spark saveAsTable with bucketBy创建了数千个文件?

上下文

Spark 2.0.1,在集群模式下spark-submit.我正在读取hdfs的镶木地板文件:

val spark = SparkSession.builder
      .appName("myApp")
      .config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
      .config("spark.sql.sources.bucketing.enabled", true)
      .enableHiveSupport()
      .getOrCreate()

val df = spark.read
              .format("parquet")
              .load("hdfs://XXX.XX.X.XX/myParquetFile")
Run Code Online (Sandbox Code Playgroud)

我保存df到50个桶的蜂巢表分组userid:

df0.write
   .bucketBy(50, "userid")
   .saveAsTable("myHiveTable")
Run Code Online (Sandbox Code Playgroud)

现在,当我查看hdfs的hive仓库时,/user/hive/warehouse有一个名为的文件夹myHiveTable.里面是一堆part-*.parquet文件.我希望有50个文件.但不,有3201个文件!!!! 每个分区有64个文件,为什么?对于我保存为hive表的不同文件,每个分区有不同数量的文件.所有文件都很小,每个只有几十Kb!

我要补充的,不同的,这个数字userid大约是1 000 000myParquetFile.

为什么文件夹中有3201个文件而不是50个!这些是什么?

当我将此表读回DataFrame并打印分区数时:

val df2 = spark.sql("SELECT * FROM myHiveTable") 
println(df2.rdd.getNumPartitions)
Run Code Online (Sandbox Code Playgroud)

分区数isIt正确50,我确认数据被正确分区userid.

对于我的一个大型数据集3Tb,我创建了一个包含1000个分区的表,这些分区创建了大约数百万个文件!这超出了目录项限制1048576并给出org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException

创建的文件数量取决于什么?

有没有办法限制创建的文件数量?

我应该担心这些文件吗?df2拥有所有这些文件会损害性能吗?总是说我们不应该创建太多分区,因为它有问题.

我发现这个信息HIVE动态分区提示文件的数量可能与映射器的数量有关.建议distribute by在插入蜂巢表时使用.我怎么能在Spark中做到这一点?

题 …

hive apache-spark

17
推荐指数
2
解决办法
9816
查看次数

Spark知道DataFrame的分区键吗?

我想知道Spark是否知道镶木地板文件的分区键,并使用此信息来避免随机播放.

语境:

运行Spark 2.0.1运行本地SparkSession.我有一个csv数据集,我将其保存为我的磁盘上的镶木地板文件,如下所示:

val df0 = spark
  .read
  .format("csv")
  .option("header", true)
  .option("delimiter", ";")
  .option("inferSchema", false)
  .load("SomeFile.csv"))


val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)

df.write
  .mode(SaveMode.Overwrite)
  .format("parquet")
  .option("inferSchema", false)
  .save("SomeFile.parquet")
Run Code Online (Sandbox Code Playgroud)

我按列创建了42个分区numerocarte.这应该将多个组分组numerocarte到同一个分区.我write当时不想做partitionBy("numerocarte"),因为我不希望每张卡分区一个.它将是数百万.

之后在另一个脚本中,我读了这个SomeFile.parquet镶木地板文件并对其进行了一些操作.特别是我正在运行window function它,其中分区是在镶木地板文件被重新分区的同一列上完成的.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = spark.read
  .format("parquet")
  .option("header", true)
  .option("inferSchema", false)
  .load("SomeFile.parquet")

val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))

df2.withColumn("NewColumnName",
      sum(col("dollars").over(w))
Run Code Online (Sandbox Code Playgroud)

read我看到repartition按预期工作后,DataFrame df2有42个分区,每个分区都有不同的卡.

问题:

  1. Spark是否知道数据框df2是按列分区的numerocarte
  2. 如果它知道,那么窗口函数中将没有随机播放.真正? …

partitioning window-functions apache-spark

14
推荐指数
2
解决办法
1万
查看次数

Spark执行内存监控

我想要的是能够监视Spark 执行内存,而不是SparkUI中可用的存储内存。我的意思是,执行内存不是执行者内存

通过执行内存,我的意思是:

执行随机,连接,排序和聚合时,此区域用于缓冲中间数据。该区域的大小是通过spark.shuffle.memoryFraction(default0.2)配置的。根据:Spark 1.6中的统一内存管理

在寻找答案之后,我什么都没找到,只有未解决的StackOverflow问题,仅与存储内存相关的答案或类型模糊的答案使用GangliaCloudera控制台等。

似乎对堆栈溢出有此信息的需求,但没有一个令人满意的答案。这是搜索监视Spark内存时StackOverflow的一些重要帖子

监视Spark执行和存储内存利用率

监视Spark作业的内存使用情况

SPARK:如何监视Spark群集上的内存消耗?

Spark-监视实际使用的执行程序内存

如何通过Spark应用程序监视内存和CPU使用情况?

如何通过Spark应用程序获取内存和CPU使用率?

问题

Spark版本> 2.0

  1. 是否可以监视Spark作业的执行内存?通过监视,我的意思是至少看到已使用/可用,就像在SparkUI的“执行程序”选项卡中为每个执行程序查看存储内存一样。是还是不是?

  2. 我可以用SparkListeners(@JacekLaskowski吗?)历史服务器怎么样?还是唯一的办法就是通过外部工具?Graphana,Ganglia,还有其他人吗?如果是外部工具,您能否指向教程或提供一些更详细的指南?

  3. 我看到了此SPARK-9103跟踪spark的内存使用情况,似乎尚无法监视执行内存。同样,这似乎与SPARK-23206其他内存调整指标有关

  4. 是否Peak Execution memory可靠估计任务中执行内存的使用/占用?例如,如果一个阶段UI表示某个任务在峰值使用1 Gb,而每个执行者我有5 cpu,是否意味着我需要每个执行者至少有5 Gb执行内存来完成一个阶段?

  5. 我们还可以使用其他代理来了解执行内存吗?

  6. 有没有办法知道执行内存何时开始消耗到存储内存中?当我的缓存表从SparkUI的“存储”选项卡中消失或仅保留一部分时,是否意味着它已被执行内存驱逐?

memory memory-management apache-spark unified-memory

11
推荐指数
1
解决办法
4194
查看次数

Spark 中的用户定义聚合函数 UDAF 何时发生合并

我想知道在什么情况下 Spark 将执行合并作为 UDAF 功能的一部分。

动机: 我在 Spark 项目中的一个窗口上使用了很多 UDAF 函数。我经常想回答这样的问题:

信用卡交易在 30 天内与当前交易在同一国家/地区进行了多少次?

该窗口将从当前事务开始,但不会将其包含在计数中。它需要当前交易的价值才能知道过去 30 天内要计算哪个国家。

val rollingWindow = Window
      .partitionBy(partitionByColumn)
      .orderBy(orderByColumn.desc)
      .rangeBetween(0, windowSize)

df.withColumn(
  outputColumnName,
  customUDAF(inputColumn, orderByColumn).over(rollingWindow))
Run Code Online (Sandbox Code Playgroud)

我写了我的 customUDAF 来进行计数。我总是使用.orderBy(orderByColumn.desc)并感谢.desc当前交易在计算过程中出现在窗口中的第一个。

UDAF 函数需要实现merge在并行计算中合并两个中间聚合缓冲区的函数。如果发生任何合并,current transaction不同缓冲区的my可能不相同,UDAF 的结果将不正确。

我编写了一个 UDAF 函数,该函数计算我的数据集上的合并次数,并仅保留窗口中的第一个事务以与当前事务进行比较。

 class FirstUDAF() extends UserDefinedAggregateFunction {

  def inputSchema = new StructType().add("x", StringType)
    .add("y", StringType)

  def bufferSchema = new StructType()
    .add("first", StringType)
    .add("numMerge", IntegerType)

  def dataType = new StructType()
    .add("firstCode", StringType)
    .add("numMerge", IntegerType)

  def deterministic …
Run Code Online (Sandbox Code Playgroud)

scala user-defined-aggregate apache-spark apache-spark-sql

5
推荐指数
1
解决办法
1260
查看次数

Spark是否会通过数据传递多个withColumn?

当多个withColumn函数被链接时,Spark是否会对数据进行一次或多次传递?

例如:

val dfnew = df.withColumn("newCol1", f1(col("a")))
              .withColumn("newCol2", f2(col("b")))
              .withColumn("newCol3", f3(col("c")))
Run Code Online (Sandbox Code Playgroud)

哪里

  • df是我的输入DataFrame至少包含列a,b,c
  • dfnew输出DataFrame三个新列newCol1,newCol2,newCol3
  • f1,f2,f3有一些用户定义的函数或像蒙上了,等在我的项目列了一些火花的操作,我可以有甚至30独立的withColumn功能链接与foldLeft.

重要

我在这里假设f2不依赖于结果f1,f3而不依赖于f1和的结果f2.可以按任何顺序执行这些功能.任何功能都没有随机播放

我的观察

  • 所有功能都在同一个阶段
  • 添加new withColumn不会增加执行时间,从而怀疑通过数据传递额外的段落.
  • 我已经测试了例如SQLTransformer包含所有函数的select语句和SQLTransformer每个函数的多个独立函数,并且执行时间类似.

问题

  • 火花会通过数据进行一到三次传递,每次一次withColumn吗?
  • 它取决于功能类型f1,f2,f3?UDF与泛型Spark操作?
  • 如果功能f1,f2,f3在同一阶段内,这是否意味着他们是在同一个数据传递?
  • 段落的数量是否取决于功能内的随机播放?如果没有洗牌?
  • 如果我用withColumn函数链接函数foldLeft会改变段落的数量吗?
  • 我可以在同一个select_statement 中用三个 …

scala apache-spark apache-spark-sql

4
推荐指数
1
解决办法
842
查看次数