如何在spark中为diff文件名调用单独的逻辑

use*_*473 5 scala readfile apache-spark

我的文件夹中有3个日志文件.喜欢

foldera = emplog,deptlog,companylog
folderb = emplog,deptlog,companylog
folderc = emplog,deptlog,companylog
Run Code Online (Sandbox Code Playgroud)

我有3个diff scala程序文件来从每个文件中提取数据.

employee.scala
department.scala
companylog.scala
Run Code Online (Sandbox Code Playgroud)

他们每个人的代码如下.

我想结合所有这些文件并以并行方式执行它们.

   package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}

object logparser {
  def main(args: Array[String]) = {

      Logger.getLogger("org").setLevel(Level.OFF)     
      Logger.getLogger("akka").setLevel(Level.OFF)
    //Start the Spark context
    val conf = new SparkConf()
      .setAppName("Parser")
      .setMaster("local")

      val sc = new SparkContext(conf)
      val sqlContext= new SQLContext(sc)

      val test = sc.wholeTextFiles("C:\\mkdir\\*\\*")
      .map{l =>
             if(l._1.endsWith("emplog.txt")){ 
             empparser(l._2,sc,sqlContext)
               }

             l
        }
      .foreach{println}
  }

  def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = {
     val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r

      import sqlContext.implicits._
     val indrecs = emppattern.findAllIn(record)
    .map{ line => 
      val emppattern(eid,ename) = line

     (eid,ename)
    }
     .toSeq
   .toDF("eid","ename")

   .show() 


  }
}
Run Code Online (Sandbox Code Playgroud)

我已尝试将代码附加到同一对象中的每个方法.

现在Q1出现2个问题.当我编译我得到

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6b0615ae)
    - field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class com.sample.logparser$$anonfun$1, <function1>)
Run Code Online (Sandbox Code Playgroud)

据我所知(仅限新手)Spark上下文无法序列化.如果我不传递sc作为参数,我得到Nullpointer异常.我该如何解决这个问题?

Q2:转换为DF后,我将在empparser方法中插入hive表代码.完成后,我不想在我的主要内容中做任何事情.但是我的地图代码不会执行,除非我之后有动作.这就是为什么我之后有foreacch println.有办法克服这个问题吗?

maa*_*asg 1

为了尝试回答这个问题,我假设处理员工或部门的结果会产生相同类型的记录。我希望每种数据的情况都不同,因此我分别保留不同类型记录的处理,以允许这种“根据现实进行调整”。

首先,我们定义记录case class和不同种类或记录类型的解析器。(为了简单起见,我在这里复制相同的实现)

case class Record(id:String, name: String)

val empParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

val deptParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

val companyParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}
Run Code Online (Sandbox Code Playgroud)

我们使用以下方式加载数据wholeFiles

val dataPath = "/.../data/wholefiles/*/*"
val logFiles =  sc.wholeTextFiles(dataPath)
Run Code Online (Sandbox Code Playgroud)

然后,我们通过过滤文件来处理不同类型的记录以获得我们需要的文件类型并应用上面定义的解析器。请注意我们实际上是如何重复相同的过程的。这个可以抽象出来。

val empLogs = logFiles.filter{case (filename, content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> empParser(line))}
val deptLogs = logFiles.filter{case (filename, content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> deptParser(line))}
val compLogs = logFiles.filter{case (filename, content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> companyParser(line))}
Run Code Online (Sandbox Code Playgroud)

我们现在转换为 DataFrame

val empDF  = empLogs.toDF
Run Code Online (Sandbox Code Playgroud)

我们也可以对其他记录类型执行相同的操作。

这个过程中有足够的空间来减少代码重复,这取决于我们是否能在不同数据类型的过程中找到共性。