Chr*_*lis 11 scala collect dataframe take apache-spark
我正在使用Spark 1.5.
我有一个30列的列,我integers从数据库加载:
val numsRDD = sqlContext
.table(constants.SOURCE_DB + "." + IDS)
.select("id")
.distinct
.map(row=>row.getInt(0))
Run Code Online (Sandbox Code Playgroud)
这是输出numsRDD:
numsRDD.collect.foreach(println(_))
643761
30673603
30736590
30773400
30832624
31104189
31598495
31723487
32776244
32801792
32879386
32981901
33469224
34213505
34709608
37136455
37260344
37471301
37573190
37578690
37582274
37600896
37608984
37616677
37618105
37644500
37647770
37648497
37720353
37741608
Run Code Online (Sandbox Code Playgroud)
接下来,我想为那些产生3的所有组合,ids然后将每个组合保存为表单的元组:< tripletID: String, triplet: Array(Int)>并将其转换为数据帧,我按如下方式执行:
// |combinationsDF| = 4060 combinations
val combinationsDF = sc
.parallelize(numsRDD
.collect
.combinations(3)
.toArray
.map(row => row.sorted)
.map(row => (
List(row(0), row(1), row(2)).mkString(","),
List(row(0), row(1), row(2)).toArray)))
.toDF("tripletID","triplet")
Run Code Online (Sandbox Code Playgroud)
一旦我这样做,我会尝试打印一些combinationsDF内容,以确保一切都是应有的.所以我试试这个:
combinationsDF.show
Run Code Online (Sandbox Code Playgroud)
返回:
+--------------------+--------------------+
| tripletID| triplet|
+--------------------+--------------------+
|,37136455,3758227...|[32776244, 371364...|
|,37136455,3761667...|[32776244, 371364...|
|,32776244,3713645...|[31723487, 327762...|
|,37136455,3757869...|[32776244, 371364...|
|,32776244,3713645...|[31598495, 327762...|
|,37136455,3760089...|[32776244, 371364...|
|,37136455,3764849...|[32776244, 371364...|
|,37136455,3764450...|[32776244, 371364...|
|,37136455,3747130...|[32776244, 371364...|
|,32981901,3713645...|[32776244, 329819...|
|,37136455,3761810...|[32776244, 371364...|
|,34213505,3713645...|[32776244, 342135...|
|,37136455,3726034...|[32776244, 371364...|
|,37136455,3772035...|[32776244, 371364...|
|2776244,37136455...|[643761, 32776244...|
|,37136455,3764777...|[32776244, 371364...|
|,37136455,3760898...|[32776244, 371364...|
|,32879386,3713645...|[32776244, 328793...|
|,32776244,3713645...|[31104189, 327762...|
|,32776244,3713645...|[30736590, 327762...|
+--------------------+--------------------+
only showing top 20 rows
Run Code Online (Sandbox Code Playgroud)
很明显,每个人的第一个元素都tripletID缺失了.所以,只是为了100%确定我使用take(20)如下:
combinationsDF.take(20).foreach(println(_))
Run Code Online (Sandbox Code Playgroud)
返回更详细的表示如下:
[,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[2776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]
Run Code Online (Sandbox Code Playgroud)
所以现在我确信tripletID无论出于什么原因弃用了第一个id .但是,如果我尝试使用collect而不是take(20):
combinationsDF.collect.foreach(println(_))
Run Code Online (Sandbox Code Playgroud)
一切又恢复正常(!!!):
[32776244,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[32776244,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[31723487,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[32776244,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[31598495,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[32776244,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[32776244,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[32776244,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[32776244,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[32776244,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[32776244,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[32776244,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[32776244,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[32776244,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[643761,32776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[32776244,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[32776244,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[32776244,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[31104189,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[30736590,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]
...
Run Code Online (Sandbox Code Playgroud)
1.在parallelize将组合数组放入RDD 之前,我已经详尽地查询了步骤,一切正常.2.我已经parallelize应用后立即打印输出,一切都还可以.3.问题似乎与numsRDD转换为DF有关,尽管我付出了最大的努力,但我无法处理它.4.我也无法使用相同的代码片段重现模拟数据的问题.
首先:是什么导致了这个问题? 第二:我该如何解决?
小智 8
例如
df.show()
Out[11]:
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
Run Code Online (Sandbox Code Playgroud)
例如
df.collect()
Out[11]:
[Row(age=None, name=u'Michael'),
Row(age=30, name=u'Andy'),
Row(age=19, name=u'Justin')]
Run Code Online (Sandbox Code Playgroud)
例如,仅查看数据帧的前两行
df.take(2)
Out[13]:
[Row(age=None, name=u'Michael'), Row(age=30, name=u'Andy')]
Run Code Online (Sandbox Code Playgroud)
我会检查你的原始文件numsRDD,看起来你可能有一个空字符串或空值。这对我有用:
scala> val numsRDD = sc.parallelize(0 to 30)
numsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> :pa
// Entering paste mode (ctrl-D to finish)
val combinationsDF = sc
.parallelize(numsRDD
.collect
.combinations(3)
.toArray
.map(row => row.sorted)
.map(row => (
List(row(0), row(1), row(2)).mkString(","),
List(row(0), row(1), row(2)).toArray)))
.toDF("tripletID","triplet")
// Exiting paste mode, now interpreting.
combinationsDF: org.apache.spark.sql.DataFrame = [tripletID: string, triplet: array<int>]
scala> combinationsDF.show
+---------+----------+
|tripletID| triplet|
+---------+----------+
| 0,1,2| [0, 1, 2]|
| 0,1,3| [0, 1, 3]|
| 0,1,4| [0, 1, 4]|
| 0,1,5| [0, 1, 5]|
| 0,1,6| [0, 1, 6]|
| 0,1,7| [0, 1, 7]|
| 0,1,8| [0, 1, 8]|
| 0,1,9| [0, 1, 9]|
| 0,1,10|[0, 1, 10]|
| 0,1,11|[0, 1, 11]|
| 0,1,12|[0, 1, 12]|
| 0,1,13|[0, 1, 13]|
| 0,1,14|[0, 1, 14]|
| 0,1,15|[0, 1, 15]|
| 0,1,16|[0, 1, 16]|
| 0,1,17|[0, 1, 17]|
| 0,1,18|[0, 1, 18]|
| 0,1,19|[0, 1, 19]|
| 0,1,20|[0, 1, 20]|
| 0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows
Run Code Online (Sandbox Code Playgroud)
我唯一能想到的另一件事就是mkString没有像你期望的那样工作。尝试这个字符串插值(也不需要重新创建List):
val combinationsDF = sc
.parallelize(numsRDD
.collect
.combinations(3)
.toArray
.map(row => row.sorted)
.map{case List(a,b,c) => (
s"$a,$b,$c",
Array(a,b,c))}
.toDF("tripletID","triplet")
scala> combinationsDF.show
+---------+----------+
|tripletID| triplet|
+---------+----------+
| 0,1,2| [0, 1, 2]|
| 0,1,3| [0, 1, 3]|
| 0,1,4| [0, 1, 4]|
| 0,1,5| [0, 1, 5]|
| 0,1,6| [0, 1, 6]|
| 0,1,7| [0, 1, 7]|
| 0,1,8| [0, 1, 8]|
| 0,1,9| [0, 1, 9]|
| 0,1,10|[0, 1, 10]|
| 0,1,11|[0, 1, 11]|
| 0,1,12|[0, 1, 12]|
| 0,1,13|[0, 1, 13]|
| 0,1,14|[0, 1, 14]|
| 0,1,15|[0, 1, 15]|
| 0,1,16|[0, 1, 16]|
| 0,1,17|[0, 1, 17]|
| 0,1,18|[0, 1, 18]|
| 0,1,19|[0, 1, 19]|
| 0,1,20|[0, 1, 20]|
| 0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
17397 次 |
| 最近记录: |