上下文
Spark 2.0.1,在集群模式下spark-submit.我正在读取hdfs的镶木地板文件:
val spark = SparkSession.builder
.appName("myApp")
.config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
.config("spark.sql.sources.bucketing.enabled", true)
.enableHiveSupport()
.getOrCreate()
val df = spark.read
.format("parquet")
.load("hdfs://XXX.XX.X.XX/myParquetFile")
Run Code Online (Sandbox Code Playgroud)
我保存df到50个桶的蜂巢表分组userid:
df0.write
.bucketBy(50, "userid")
.saveAsTable("myHiveTable")
Run Code Online (Sandbox Code Playgroud)
现在,当我查看hdfs的hive仓库时,/user/hive/warehouse有一个名为的文件夹myHiveTable.里面是一堆part-*.parquet文件.我希望有50个文件.但不,有3201个文件!!!! 每个分区有64个文件,为什么?对于我保存为hive表的不同文件,每个分区有不同数量的文件.所有文件都很小,每个只有几十Kb!
我要补充的,不同的,这个数字userid大约是1 000 000在myParquetFile.
题
为什么文件夹中有3201个文件而不是50个!这些是什么?
当我将此表读回DataFrame并打印分区数时:
val df2 = spark.sql("SELECT * FROM myHiveTable")
println(df2.rdd.getNumPartitions)
Run Code Online (Sandbox Code Playgroud)
分区数isIt正确50,我确认数据被正确分区userid.
对于我的一个大型数据集3Tb,我创建了一个包含1000个分区的表,这些分区创建了大约数百万个文件!这超出了目录项限制1048576并给出org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException
题
创建的文件数量取决于什么?
题
有没有办法限制创建的文件数量?
题
我应该担心这些文件吗?df2拥有所有这些文件会损害性能吗?总是说我们不应该创建太多分区,因为它有问题.
题
我发现这个信息HIVE动态分区提示文件的数量可能与映射器的数量有关.建议distribute by在插入蜂巢表时使用.我怎么能在Spark中做到这一点?
题 …
我想知道Spark是否知道镶木地板文件的分区键,并使用此信息来避免随机播放.
语境:
运行Spark 2.0.1运行本地SparkSession.我有一个csv数据集,我将其保存为我的磁盘上的镶木地板文件,如下所示:
val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
Run Code Online (Sandbox Code Playgroud)
我按列创建了42个分区numerocarte.这应该将多个组分组numerocarte到同一个分区.我write当时不想做partitionBy("numerocarte"),因为我不希望每张卡分区一个.它将是数百万.
之后在另一个脚本中,我读了这个SomeFile.parquet镶木地板文件并对其进行了一些操作.特别是我正在运行window function它,其中分区是在镶木地板文件被重新分区的同一列上完成的.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
Run Code Online (Sandbox Code Playgroud)
在read我看到repartition按预期工作后,DataFrame df2有42个分区,每个分区都有不同的卡.
问题:
df2是按列分区的numerocarte?我想要的是能够监视Spark 执行内存,而不是SparkUI中可用的存储内存。我的意思是,执行内存不是执行者内存。
通过执行内存,我的意思是:
执行随机,连接,排序和聚合时,此区域用于缓冲中间数据。该区域的大小是通过spark.shuffle.memoryFraction(default0.2)配置的。根据:Spark 1.6中的统一内存管理
在寻找答案之后,我什么都没找到,只有未解决的StackOverflow问题,仅与存储内存相关的答案或类型模糊的答案使用Ganglia,Cloudera控制台等。
似乎对堆栈溢出有此信息的需求,但没有一个令人满意的答案。这是搜索监视Spark内存时StackOverflow的一些重要帖子
问题
Spark版本> 2.0
是否可以监视Spark作业的执行内存?通过监视,我的意思是至少看到已使用/可用,就像在SparkUI的“执行程序”选项卡中为每个执行程序查看存储内存一样。是还是不是?
我可以用SparkListeners(@JacekLaskowski吗?)历史服务器怎么样?还是唯一的办法就是通过外部工具?Graphana,Ganglia,还有其他人吗?如果是外部工具,您能否指向教程或提供一些更详细的指南?
我看到了此SPARK-9103跟踪spark的内存使用情况,似乎尚无法监视执行内存。同样,这似乎与SPARK-23206其他内存调整指标有关。
是否Peak Execution memory可靠估计任务中执行内存的使用/占用?例如,如果一个阶段UI表示某个任务在峰值使用1 Gb,而每个执行者我有5 cpu,是否意味着我需要每个执行者至少有5 Gb执行内存来完成一个阶段?
我们还可以使用其他代理来了解执行内存吗?
有没有办法知道执行内存何时开始消耗到存储内存中?当我的缓存表从SparkUI的“存储”选项卡中消失或仅保留一部分时,是否意味着它已被执行内存驱逐?
我想知道在什么情况下 Spark 将执行合并作为 UDAF 功能的一部分。
动机: 我在 Spark 项目中的一个窗口上使用了很多 UDAF 函数。我经常想回答这样的问题:
信用卡交易在 30 天内与当前交易在同一国家/地区进行了多少次?
该窗口将从当前事务开始,但不会将其包含在计数中。它需要当前交易的价值才能知道过去 30 天内要计算哪个国家。
val rollingWindow = Window
.partitionBy(partitionByColumn)
.orderBy(orderByColumn.desc)
.rangeBetween(0, windowSize)
df.withColumn(
outputColumnName,
customUDAF(inputColumn, orderByColumn).over(rollingWindow))
Run Code Online (Sandbox Code Playgroud)
我写了我的 customUDAF 来进行计数。我总是使用.orderBy(orderByColumn.desc)并感谢.desc当前交易在计算过程中出现在窗口中的第一个。
UDAF 函数需要实现merge在并行计算中合并两个中间聚合缓冲区的函数。如果发生任何合并,current transaction不同缓冲区的my可能不相同,UDAF 的结果将不正确。
我编写了一个 UDAF 函数,该函数计算我的数据集上的合并次数,并仅保留窗口中的第一个事务以与当前事务进行比较。
class FirstUDAF() extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
.add("y", StringType)
def bufferSchema = new StructType()
.add("first", StringType)
.add("numMerge", IntegerType)
def dataType = new StructType()
.add("firstCode", StringType)
.add("numMerge", IntegerType)
def deterministic …Run Code Online (Sandbox Code Playgroud) 当多个withColumn函数被链接时,Spark是否会对数据进行一次或多次传递?
例如:
val dfnew = df.withColumn("newCol1", f1(col("a")))
.withColumn("newCol2", f2(col("b")))
.withColumn("newCol3", f3(col("c")))
Run Code Online (Sandbox Code Playgroud)
哪里
df是我的输入DataFrame至少包含列a,b,cdfnew输出DataFrame三个新列newCol1,newCol2,newCol3f1,f2,f3有一些用户定义的函数或像蒙上了,等在我的项目列了一些火花的操作,我可以有甚至30独立的withColumn功能链接与foldLeft.重要
我在这里假设f2不依赖于结果f1,f3而不依赖于f1和的结果f2.可以按任何顺序执行这些功能.任何功能都没有随机播放
我的观察
withColumn不会增加执行时间,从而怀疑通过数据传递额外的段落.SQLTransformer包含所有函数的select语句和SQLTransformer每个函数的多个独立函数,并且执行时间类似.问题
withColumn吗?f1,f2,f3?UDF与泛型Spark操作?f1,f2,f3在同一阶段内,这是否意味着他们是在同一个数据传递?withColumn函数链接函数foldLeft会改变段落的数量吗?