如何在Spark 2.1中保存分区的镶木地板文件?

Dan*_*pez 10 scala apache-spark parquet apache-spark-sql

我试图测试如何使用Spark 2.1在HDFS 2.7中写入数据.我的数据是一个简单的虚拟值序列,输出应该由属性:idkey分区.

 // Simple case class to cast the data
 case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)

 // Actual data to be stored
 val testData = Seq(
    SimpleTest("test", 12, 13.5.toFloat, 1),
    SimpleTest("test", 12, 13.5.toFloat, 2),
    SimpleTest("test", 12, 13.5.toFloat, 3),
    SimpleTest("simple", 12, 13.5.toFloat, 1),
    SimpleTest("simple", 12, 13.5.toFloat, 2),
    SimpleTest("simple", 12, 13.5.toFloat, 3)
 )

 // Spark's workflow to distribute, partition and store
 // sc and sql are the SparkContext and SparkSession, respectively
 val testDataP = sc.parallelize(testData, 6)
 val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
 testDf.write.partitionBy("id", "key").parquet("/path/to/file")
Run Code Online (Sandbox Code Playgroud)

我期望在HDFS中获得以下树结构:

- /path/to/file
   |- /id=test/key=1/part-01.parquet
   |- /id=test/key=2/part-02.parquet
   |- /id=test/key=3/part-03.parquet
   |- /id=simple/key=1/part-04.parquet
   |- /id=simple/key=2/part-05.parquet
   |- /id=simple/key=3/part-06.parquet
Run Code Online (Sandbox Code Playgroud)

但是当我运行前面的代码时,我得到以下输出:

/path/to/file/id=/key=24/
 |-/part-01.parquet
 |-/part-02.parquet
 |-/part-03.parquet
 |-/part-04.parquet
 |-/part-05.parquet
 |-/part-06.parquet
Run Code Online (Sandbox Code Playgroud)

我不知道代码中是否有什么问题,或者Spark有没有其他的东西.

我正在执行spark-submit如下:

spark-submit --name APP --master local --driver-memory 30G --executor-memory 30G --executor-cores 8 --num-executors 8 --conf spark.io.compression.codec = lzf --conf spark.akka.frameSize = 1024 --conf spark.driver.maxResultSize = 1g --conf spark.sql.orc.compression.codec = uncompressed --conf spark.sql.parquet.filterPushdown = true --class myClass myFatJar.jar

Jac*_*ski 11

有趣的是......好吧...... "它适合我".

当您使用SimpleTestSpark 2.1中的案例类描述数据集时,您将import spark.implicits._获得一个类型Dataset.

就我而言,sparksql.

换句话说,您不必创建testDataPtestDf(使用sql.createDataFrame).

import spark.implicits._
...
val testDf = testData.toDS
testDf.write.partitionBy("id", "key").parquet("/path/to/file")
Run Code Online (Sandbox Code Playgroud)

在另一个终端(保存到/tmp/testDf目录后):

$ tree /tmp/testDf/
/tmp/testDf/
??? _SUCCESS
??? id=simple
?   ??? key=1
?   ?   ??? part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
?   ??? key=2
?   ?   ??? part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
?   ??? key=3
?       ??? part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
??? id=test
    ??? key=1
    ?   ??? part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    ??? key=2
    ?   ??? part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    ??? key=3
        ??? part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet

8 directories, 7 files
Run Code Online (Sandbox Code Playgroud)


Dan*_*pez 7

我找到了解决办法!根据 Cloudera,是mapred-site.xml配置问题(检查下面的链接)。此外,不要将数据帧写为:testDf.write.partitionBy("id", "key").parquet("/path/to/file")

我做了如下:testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file")。您可以分别用 HDFS 的主节点名称和端口替换<namenode><port>

特别感谢@jacek-laskowski 的宝贵贡献。

参考:

https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/mp/36363#M1090

在 Spark/Scala 中写入 HDFS