关于为什么随机写入数据的详细说明比apache spark中的输入数据更多

Abh*_*and 7 hdfs cloudera apache-spark

在此输入图像描述

  1. 任何人都可以告诉我究竟什么输入,输出,随机读取和随机写入在spark UI中指定?
  2. 另外,有人可以解释这项工作的输入是如何进行洗牌的25~30%?根据我的理解,shuffle write是无法在内存中保存的临时数据和在聚合或减少期间需要发送给其他执行程序的数据的总和.

代码如下:

hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'")
    .rdd
    .map{case (row:Row)
            =>((row.getString(0), row.getString(12)),
                (row.getTimestamp(11), row.getTimestamp(11),
                    row))}
    .filter{case((client, hash),(d1,d2,obj)) => (d1 !=null && d2 !=null)}
   .reduceByKey{
       case(x, y)=>
            if(x._1.before(y._1)){
                if(x._2.after(y._2))
                    (x)
                else
                    (x._1, y._2, y._3)
            }else{
                if(x._2.after(y._2))
                    (y._1, x._2, x._3)
                else
                    (y)
            }
   }.count()
Run Code Online (Sandbox Code Playgroud)

其中ReadDailyFileDataObject是一个case行,它将行字段保存为容器.容器是必需的,因为有30列,超过22的元组限制.

当我使用Row本身而不是case Class时,更新了Code,删除了case类,因为我看到同样的问题.

现在我看到了

任务:10/7772

输入:2.1 GB

随机写:14.6 GB

如果它有帮助,我试图处理存储为镶木地板文件的表,包含210亿行.

以下是我正在使用的参数,

"spark.yarn.am.memory" -> "10G"
"spark.yarn.am.cores"  -> "5"
"spark.driver.cores"   -> "5"
"spark.executor.cores" -> "10"
"spark.dynamicAllocation.enabled" -> "true"
"spark.yarn.containerLauncherMaxThreads" -> "120"
"spark.executor.memory" -> "30g"
"spark.driver.memory" -> "10g"
"spark.driver.maxResultSize" -> "9g"
"spark.serializer" -> "org.apache.spark.serializer.KryoSerializer"
"spark.kryoserializer.buffer" -> "10m"
"spark.kryoserializer.buffer.max" -> "2001m"
"spark.akka.frameSize" -> "2020"
Run Code Online (Sandbox Code Playgroud)

SparkContext注册为

new SparkContext("yarn-client", SPARK_SCALA_APP_NAME, sparkConf)
Run Code Online (Sandbox Code Playgroud)

在纱线上,我明白了

分配的CPU VCores:95

分配的内存:309 GB

运行容器:10

alg*_*imo 1

如果没有代码,实际上很难提供答案,但您可能会多次查看数据,因此您正在处理的总容量实际上是原始数据的“X”倍。

您可以发布您正在运行的代码吗?

编辑

查看代码,我以前遇到过此类问题,这是由于 Row 的序列化造成的,所以这也可能是您的情况。

什么是“ReadDailyFileDataObject”?它是一个类,一个案例类吗?

我首先尝试像这样运行你的代码:

hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'")
    .rdd
    .map{case (row:Row)
            =>((row.get(0).asInstanceOf[String], row.get(12).asInstanceOf[String]),
                (row.get(11).asInstanceOf[Timestamp], row.get(11).asInstanceOf[Timestamp]))}
    .filter{case((client, hash),(d1,d2)) => (d1 !=null && d2 !=null)}
   .reduceByKey{
       case(x, y)=>
            if(x._1.before(y._1)){
                if(x._2.after(y._2))
                    (x)
                else
                    (x._1, y._2)
            }else{
                if(x._2.after(y._2))
                    (y._1, x._2)
                else
                    (y)
            }
   }.count()
Run Code Online (Sandbox Code Playgroud)

如果这消除了您的洗牌问题,那么您可以稍微重构它: - 如果还没有的话,将其设为案例类。- 像“ReadDailyFileDataObject(row.getInt(0), row.getString(1), etc..)”一样创建它

希望这算是一个答案,并帮助您找到瓶颈。