Ama*_*nda 5 apache-spark-sql pyspark
假设下一个PySpark DataFrame:
+-------+----+---+---+----+
|user_id|type| d1| d2| d3|
+-------+----+---+---+----+
| c1| A|3.4|0.4| 3.5|
| c1| B|9.6|0.0| 0.0|
| c1| A|2.8|0.4| 0.3|
| c1| B|5.4|0.2|0.11|
| c2| A|0.0|9.7| 0.3|
| c2| B|9.6|8.6| 0.1|
| c2| A|7.3|9.1| 7.0|
| c2| B|0.7|6.4| 4.3|
+-------+----+---+---+----+
Run Code Online (Sandbox Code Playgroud)
创建时:
df = sc.parallelize([
("c1", "A", 3.4, 0.4, 3.5),
("c1", "B", 9.6, 0.0, 0.0),
("c1", "A", 2.8, 0.4, 0.3),
("c1", "B", 5.4, 0.2, 0.11),
("c2", "A", 0.0, 9.7, 0.3),
("c2", "B", 9.6, 8.6, 0.1),
("c2", "A", 7.3, 9.1, 7.0),
("c2", "B", 0.7, 6.4, 4.3)
]).toDF(["user_id", "type", "d1", "d2", "d3"])
df.show()
Run Code Online (Sandbox Code Playgroud)
然后,它转过来user_id获得:
data_wide = df.groupBy('user_id')\
.pivot('type')\
.agg(*[f.sum(x).alias(x) for x in df.columns if x not in {"user_id", "type"}])
data_wide.show()
+-------+-----------------+------------------+----+------------------+----+------------------+
|user_id| A_d1| A_d2|A_d3| B_d1|B_d2| B_d3|
+-------+-----------------+------------------+----+------------------+----+------------------+
| c1|6.199999999999999| 0.8| 3.8| 15.0| 0.2| 0.11|
| c2| 7.3|18.799999999999997| 7.3|10.299999999999999|15.0|4.3999999999999995|
+-------+-----------------+------------------+----+------------------+----+------------------+
Run Code Online (Sandbox Code Playgroud)
现在,我想随机化其行顺序:
data_wide = data_wide.orderBy(f.rand())
data_wide.show()
Run Code Online (Sandbox Code Playgroud)
但最后一步抛出NullPointedException:
Py4JJavaError: An error occurred while calling o101.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 161 in stage 27.0 failed 20 times, most recent failure: Lost task 161.19 in stage 27.0 (TID 1300, 192.168.192.57, executor 1): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_3$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:232)
Run Code Online (Sandbox Code Playgroud)
但是,如果在orderBy(f.rand())步骤之前缓存宽df ,则最后一步可以正常工作:
data_wide.cache()
data_wide = data_wide.orderBy(f.rand())
data_wide.show()
+-------+-----------------+------------------+----+------------------+----+------------------+
|user_id| A_d1| A_d2|A_d3| B_d1|B_d2| B_d3|
+-------+-----------------+------------------+----+------------------+----+------------------+
| c2| 7.3|18.799999999999997| 7.3|10.299999999999999|15.0|4.3999999999999995|
| c1|6.199999999999999| 0.8| 3.8| 15.0| 0.2| 0.11|
+-------+-----------------+------------------+----+------------------+----+------------------+
Run Code Online (Sandbox Code Playgroud)
这里有什么问题?看来,在这orderBy一步中,枢轴尚未生效,并且没有正确规划执行,但我不知道实际问题是什么.有任何想法吗?
Spark版本是2.1.0,python版本是3.5.2
先感谢您
| 归档时间: |
|
| 查看次数: |
131 次 |
| 最近记录: |