Mpi*_*ris 3 scala apache-spark
可以说我有以下数据框:
var randomData = Seq(("a",8),("h",5),("f",3),("a",2),("b",8),("c",3)
val df = sc.parallelize(randomData,2).toDF()
Run Code Online (Sandbox Code Playgroud)
而且我有这个功能,它将作为输入mapPartition:
def trialIterator(row:Iterator[(String,Int)]): Iterator[(String,Int)] =
row.toArray.tail.toIterator
Run Code Online (Sandbox Code Playgroud)
并使用地图分区:
df.mapPartition(trialIterator)
Run Code Online (Sandbox Code Playgroud)
我收到以下错误消息:
类型不匹配,预期的(Iterator [Row])=> Iterator [NotInferedR],实际:Iterator [(String,Int)=> Iterator [(String,Int)]
我知道这是由于函数的输入,输出类型引起的,但是如何解决呢?
如果你想获得强类型输入不使用Dataset[Row](DataFrame),但Dataset[T]其中T在此特定情形的(String, Int)。同样不要在不知道分区是否为空的情况下转换为Array盲目调用tail:
def trialIterator(iter: Iterator[(String, Int)]) = iter.drop(1)
randomData
.toDS // org.apache.spark.sql.Dataset[(String, Int)]
.mapPartitions(trialIterator _)
Run Code Online (Sandbox Code Playgroud)
要么
randomData.toDF // org.apache.spark.sql.Dataset[Row]
.as[(String, Int)] // org.apache.spark.sql.Dataset[(String, Int)]
.mapPartitions(trialIterator _)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4029 次 |
| 最近记录: |