bre*_*mri 7 scala apache-spark
在玩具示例中简单明了地展示了如何在spark中编程.您只需导入,创建,使用和丢弃所有功能.
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
def main(args: String) {
val conf = new SparkConf().setAppName("example")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
import hiveContext.sql
// load data from hdfs
val df1 = sqlContext.textFile("hdfs://.../myfile.csv").map(...)
val df1B = sc.broadcast(df1)
// load data from hive
val df2 = sql("select * from mytable")
// transform df2 with df1B
val cleanCol = udf(cleanMyCol(df1B)).apply("myCol")
val df2_new = df2.withColumn("myCol", cleanCol)
...
sc.stop()
}
Run Code Online (Sandbox Code Playgroud)
在现实世界中,我发现自己编写了很多函数来模块化任务.例如,我只有几个函数来加载不同的数据表.在这些加载函数中,我会调用其他函数在加载数据时进行必要的数据清理/转换.然后我会像这样传递上下文:
def loadHdfsFileAndBroadcast(sc: SparkContext) = {
// use sc here
val df = sc.textFile("hdfs://.../myfile.csv").map(...)
val dfB = sc.broadcast(df)
dfB
}
def loadHiveTable(hiveContext: HiveContext, df1B: Broadcast[Map[String, String]]) = {
import hiveContext.implicits._
val data = hiveContext.sql("select * from myHiveTable")
// data cleaning
val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
df_cleaned = data.withColumn("myCol", cleanCol)
df_cleaned
}
Run Code Online (Sandbox Code Playgroud)
如您所见,加载函数签名很容易变得很重.
我试图将这些上下文导入放在类中的main函数之外.但这会导致问题(请参阅此问题),这让我无可奈何地传递它们.
这是要走的路还是有更好的方法来做到这一点?
首先,让我说我很高兴有人正在探索在Spark中编写干净的代码.这是我总是觉得关键的东西,但似乎人们如此专注于分析本身,他们忽视了可维护性.
我也同意Spark在这方面产生了有趣的挑战.我发现的最好的方法,当然你可能觉得这不是一个改进,是使用具有抽象方法定义的特征,并将它们混合到编排所有内容的对象中.
例如:
trait UsingSparkContextTrait {
def sc: SparkContext
def loadHdfsFileAndBroadcast = {
val df = sc.textFile("hdfs://.../myfile.csv").map(...)
sc.broadcast(df)
}
}
trait UsingHiveContextTrait {
def hiveContext: HiveContext
def df1B: Broadcast[Map[String, String]]
def loadHiveTable = {
val data = hiveContext.sql("select * from myHiveTable")
val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
data.withColumn("myCol", cleanCol)
}
}
Run Code Online (Sandbox Code Playgroud)
最后:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
class ClassDoingWork extends UsingSparkContextTrait with UsingHiveContextTrait {
val conf = new SparkConf().setAppName("example")
val sc = new SparkContext(conf) //Satisfies UsingSparkContextTrait
val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc) //Satisfies UsingHiveContextTrait
val dfb = loadHdfsFileAndBroadcast //Satisfies UsingHiveContextTrait
import hiveContext.implicits._
import hiveContext.sql
def doAnalytics = {
val dfCleaned = loadHiveTable
...
}
}
Run Code Online (Sandbox Code Playgroud)
这种依赖注入方法的一个很酷的方法是,如果您缺少代码执行所需的任何组件,您将在编译时知道.
最后,在一个更简单的说明中,您还可以SparkContext从一个RDD实例访问rdd.context.这可能也很有用.
| 归档时间: |
|
| 查看次数: |
1265 次 |
| 最近记录: |