Mar*_*ler 7 python apache-spark pyspark
我有一个工作,它加载一个 DataFrame 对象,然后使用 DataFramepartitionBy方法将数据保存为镶木地板格式。然后我发布创建的路径,以便后续作业可以使用输出。输出中的路径如下所示:
/ptest/_SUCCESS
/ptest/id=0
/ptest/id=0/part-00000-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=0/part-00001-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=0/part-00002-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1
/ptest/id=1/part-00003-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1/part-00004-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1/part-00005-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=3
/ptest/id=3/part-00006-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=3/part-00007-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
Run Code Online (Sandbox Code Playgroud)
当我收到新数据时,它会附加到数据集。路径已发布,因此依赖于数据的作业可以只处理新数据。
下面是代码的一个简化示例:
>>> rdd = sc.parallelize([(0,1,"A"), (0,1,"B"), (0,2,"C"), (1,2,"D"), (1,10,"E"), (1,20,"F"), (3,18,"G"), (3,18,"H"), (3,18,"I")])
>>> df = sqlContext.createDataFrame(rdd, ["id", "score","letter"])
>>> df.show()
+---+-----+------+
| id|score|letter|
+---+-----+------+
| 0| 1| A|
| 0| 1| B|
| 0| 2| C|
| 1| 2| D|
| 1| 10| E|
| 1| 20| F|
| 3| 18| G|
| 3| 18| H|
| 3| 18| I|
+---+-----+------+
>>> df.write.partitionBy("id").format("parquet").save("hdfs://localhost:9000/ptest")
Run Code Online (Sandbox Code Playgroud)
问题是当另一个作业尝试使用已发布的路径读取文件时:
>>> df2 = spark.read.format("parquet").schema(df2.schema).load("hdfs://localhost:9000/ptest/id=0/")
>>> df2.show()
+-----+------+
|score|letter|
+-----+------+
| 1| A|
| 1| B|
| 2| C|
+-----+------+
Run Code Online (Sandbox Code Playgroud)
如您所见,加载的数据集中缺少分区键。如果我要发布作业可以使用的模式,我可以使用该模式加载文件。文件加载并且分区键存在,但值为空:
>>> df2 = spark.read.format("parquet").schema(df.schema).load("hdfs://localhost:9000/ptest/id=0/")
>>> df2.show()
+----+-----+------+
| id|score|letter|
+----+-----+------+
|null| 1| A|
|null| 1| B|
|null| 2| C|
+----+-----+------+
Run Code Online (Sandbox Code Playgroud)
有没有办法确保分区键存储在镶木地板数据中?我不想要求其他进程解析路径来获取密钥。
zer*_*323 11
在这种情况下,您应该提供basePath option:
(spark.read
.format("parquet")
.option("basePath", "hdfs://localhost:9000/ptest/")
.load("hdfs://localhost:9000/ptest/id=0/"))
Run Code Online (Sandbox Code Playgroud)
它指向数据的根目录。
随着basePath DataFrameReader将意识到分区并相应地调整架构。
| 归档时间: |
|
| 查看次数: |
5892 次 |
| 最近记录: |