代码如下:
Run Code Online (Sandbox Code Playgroud)hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'") .rdd .map{case (row:Row) =>((row.getString(0), row.getString(12)), (row.getTimestamp(11), row.getTimestamp(11), row))} .filter{case((client, hash),(d1,d2,obj)) => (d1 !=null && d2 !=null)} .reduceByKey{ case(x, y)=> if(x._1.before(y._1)){ if(x._2.after(y._2)) (x) else (x._1, y._2, y._3) }else{ if(x._2.after(y._2)) (y._1, x._2, x._3) else (y) } }.count()
其中ReadDailyFileDataObject是一个case行,它将行字段保存为容器.容器是必需的,因为有30列,超过22的元组限制.
当我使用Row本身而不是case Class时,更新了Code,删除了case类,因为我看到同样的问题.
现在我看到了
任务:10/7772
输入:2.1 GB
随机写:14.6 GB
如果它有帮助,我试图处理存储为镶木地板文件的表,包含210亿行.
以下是我正在使用的参数,
"spark.yarn.am.memory" -> "10G"
"spark.yarn.am.cores" -> "5"
"spark.driver.cores" -> "5"
"spark.executor.cores" -> "10"
"spark.dynamicAllocation.enabled" -> "true"
"spark.yarn.containerLauncherMaxThreads" -> "120"
"spark.executor.memory" -> "30g" …Run Code Online (Sandbox Code Playgroud)