在任何地方导入Spark隐式方法

use*_*008 5 scala implicits apache-spark spark-dataframe apache-spark-2.0

我是Spark 2.0的新手,并在我们的代码库中使用数据集。我注意到我需要import spark.implicits._在代码中的任何地方。例如:

File A
class A {
    def job(spark: SparkSession) = {
        import spark.implcits._
        //create dataset ds
        val b = new B(spark)
        b.doSomething(ds)
        doSomething(ds)
    }
    private def doSomething(ds: Dataset[Foo], spark: SparkSession) = {
        import spark.implicits._
        ds.map(e => 1)            
    }
}

File B
class B(spark: SparkSession) {
    def doSomething(ds: Dataset[Foo]) = {
        import spark.implicits._
        ds.map(e => "SomeString")
    }
}
Run Code Online (Sandbox Code Playgroud)

我想问的是,是否有一种更清洁的方法

ds.map(e => "SomeString")
Run Code Online (Sandbox Code Playgroud)

没有在我做地图的每个函数中导​​入隐式函数?如果不导入,则会出现以下错误:

错误:(53,13)无法找到数据集中存储的类型的编码器。导入spark.implicits。支持基本类型(Int,String等)和产品类型(案例类)。_在将来的版本中将添加对其他类型进行序列化的支持。

Sha*_*ica 5

会有所帮助的一点是在classobject而不是每个函数中进行导入。对于“文件A”和“文件B”的示例:

File A
class A {
    val spark = SparkSession.builder.getOrCreate()
    import spark.implicits._

    def job() = {
        //create dataset ds
        val b = new B(spark)
        b.doSomething(ds)
        doSomething(ds)
    }

    private def doSomething(ds: Dataset[Foo]) = {
        ds.map(e => 1)            
    }
}

File B
class B(spark: SparkSession) {
    import spark.implicits._

    def doSomething(ds: Dataset[Foo]) = {    
        ds.map(e => "SomeString")
    }
}
Run Code Online (Sandbox Code Playgroud)

通过这种方式,您可以获得可管理的金额imports

不幸的是,据我所知,没有其他方法可以进一步减少进口数量。这是由于SparkSession在执行实际操作时需要对象import。因此,这是最好的方法。


更新:

一种更方便的方法是创建一个Scala Trait并将其与一个空容器组合Object。这样可以轻松地在每个文件的顶部导入隐式内容,同时允许扩展特征以使用SparkSession对象。

例:

trait SparkJob {
  val spark: SparkSession = SparkSession.builder.
    .master(...)
    .config(..., ....) // Any settings to be applied
    .getOrCreate()
}

object SparkJob extends SparkJob {}
Run Code Online (Sandbox Code Playgroud)

这样,我们可以对文件A和文件B执行以下操作:

档案A:

import SparkJob.spark.implicits._
class A extends SparkJob {
  spark.sql(...) // Allows for usage of the SparkSession inside the class
  ...
}
Run Code Online (Sandbox Code Playgroud)

档案B:

import SparkJob.spark.implicits._
class B extends SparkJob {
  ...    
}
Run Code Online (Sandbox Code Playgroud)

请注意,只需要为SparkJob使用该spark对象本身的类或对象扩展。

  • 很好的答案,很遗憾你在两年内得到了零支持。固定的! (2认同)