Spark:在Scala中使用mapPartition

Mpi*_*ris 3 scala apache-spark

可以说我有以下数据框:

var randomData = Seq(("a",8),("h",5),("f",3),("a",2),("b",8),("c",3)
val df = sc.parallelize(randomData,2).toDF()
Run Code Online (Sandbox Code Playgroud)

而且我有这个功能,它将作为输入mapPartition

def trialIterator(row:Iterator[(String,Int)]): Iterator[(String,Int)] =
    row.toArray.tail.toIterator
Run Code Online (Sandbox Code Playgroud)

并使用地图分区:

df.mapPartition(trialIterator)
Run Code Online (Sandbox Code Playgroud)

我收到以下错误消息:

类型不匹配,预期的(Iterator [Row])=> Iterator [NotInferedR],实际:Iterator [(String,Int)=> Iterator [(String,Int)]

我知道这是由于函数的输入,输出类型引起的,但是如何解决呢?

zer*_*323 5

如果你想获得强类型输入不使用Dataset[Row]DataFrame),但Dataset[T]其中T在此特定情形的(String, Int)。同样不要在不知道分区是否为空的情况下转换为Array盲目调用tail

def trialIterator(iter: Iterator[(String, Int)]) = iter.drop(1)

randomData
  .toDS // org.apache.spark.sql.Dataset[(String, Int)]
  .mapPartitions(trialIterator _)
Run Code Online (Sandbox Code Playgroud)

要么

randomData.toDF // org.apache.spark.sql.Dataset[Row] 
  .as[(String, Int)] // org.apache.spark.sql.Dataset[(String, Int)]
  .mapPartitions(trialIterator _)
Run Code Online (Sandbox Code Playgroud)

  • 因为对于实际应用来说,“ DataFrame”只是一个“ Dataset [Seq [Any]]”,所以您可以简单地认为它是无类型的/不安全的。 (2认同)