spark编程:组织上下文导入的最佳方式和具有多个功能的其他方法

bre*_*mri 7 scala apache-spark

在玩具示例中简单明了地展示了如何在spark中编程.您只需导入,创建,使用和丢弃所有功能.

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext

def main(args: String) {
  val conf = new SparkConf().setAppName("example")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  val hiveContext = new HiveContext(sc)
  import hiveContext.implicits._
  import hiveContext.sql

  // load data from hdfs
  val df1 = sqlContext.textFile("hdfs://.../myfile.csv").map(...)
  val df1B = sc.broadcast(df1)

  // load data from hive
  val df2 = sql("select * from mytable")
  // transform df2 with df1B
  val cleanCol = udf(cleanMyCol(df1B)).apply("myCol")
  val df2_new = df2.withColumn("myCol", cleanCol)

  ...

  sc.stop()
}
Run Code Online (Sandbox Code Playgroud)

在现实世界中,我发现自己编写了很多函数来模块化任务.例如,我只有几个函数来加载不同的数据表.在这些加载函数中,我会调用其他函数在加载数据时进行必要的数据清理/转换.然后我会像这样传递上下文:

 def loadHdfsFileAndBroadcast(sc: SparkContext) = {
   // use sc here
   val df = sc.textFile("hdfs://.../myfile.csv").map(...)
   val dfB = sc.broadcast(df)
   dfB
 }

 def loadHiveTable(hiveContext: HiveContext, df1B: Broadcast[Map[String, String]]) = {
   import hiveContext.implicits._
   val data = hiveContext.sql("select * from myHiveTable")
   // data cleaning
   val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
   df_cleaned = data.withColumn("myCol", cleanCol)
   df_cleaned
 }
Run Code Online (Sandbox Code Playgroud)

如您所见,加载函数签名很容易变得很重.

我试图将这些上下文导入放在类中的main函数之外.但这会导致问题(请参阅此问题),这让我无可奈何地传递它们.

这是要走的路还是有更好的方法来做到这一点?

Vid*_*dya 6

首先,让我说我很高兴有人正在探索在Spark中编写干净的代码.这是我总是觉得关键的东西,但似乎人们如此专注于分析本身,他们忽视了可维护性.

我也同意Spark在这方面产生了有趣的挑战.我发现的最好的方法,当然你可能觉得这不是一个改进,是使用具有抽象方法定义的特征,并将它们混合到编排所有内容的对象中.

例如:

trait UsingSparkContextTrait {
   def sc: SparkContext

   def loadHdfsFileAndBroadcast = {
      val df = sc.textFile("hdfs://.../myfile.csv").map(...)
      sc.broadcast(df)
 }
}

trait UsingHiveContextTrait {
   def hiveContext: HiveContext
   def df1B: Broadcast[Map[String, String]]
   def loadHiveTable = {
      val data = hiveContext.sql("select * from myHiveTable")
      val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
      data.withColumn("myCol", cleanCol)
 }
}
Run Code Online (Sandbox Code Playgroud)

最后:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext

class ClassDoingWork extends UsingSparkContextTrait with UsingHiveContextTrait {
   val conf = new SparkConf().setAppName("example")
   val sc = new SparkContext(conf) //Satisfies UsingSparkContextTrait
   val sqlContext = new SQLContext(sc)

   val hiveContext = new HiveContext(sc) //Satisfies UsingHiveContextTrait
   val dfb = loadHdfsFileAndBroadcast    //Satisfies UsingHiveContextTrait
   import hiveContext.implicits._
   import hiveContext.sql

   def doAnalytics = {
      val dfCleaned = loadHiveTable
      ...
   }
}
Run Code Online (Sandbox Code Playgroud)

这种依赖注入方法的一个很酷的方法是,如果您缺少代码执行所需的任何组件,您将在编译时知道.

最后,在一个更简单的说明中,您还可以SparkContext从一个RDD实例访问rdd.context.这可能也很有用.