Vis*_*mdi 1 apache-spark spark-streaming
如何在不使用函数中的sparkContext的spark函数中读取HDFS中的文件.
例:
val filedata_rdd = rdd.map { x => ReadFromHDFS(x.getFilePath) }
Run Code Online (Sandbox Code Playgroud)
问题是如何实现ReadFromHDFS?通常从HDFS读取我们可以做sc.textFile但在这种情况下sc不能在函数中使用.
您不一定需要服务上下文来与HDFS交互.您可以简单地从master广播hadoop配置,并使用执行器上的广播配置值来构造hadoop.fs.FileSystem.那么世界就是你的.:)
以下是代码:
import java.io.StringWriter
import com.sachin.util.SparkIndexJobHelper._
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SerializableWritable, SparkConf}
class Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[15]")
      .setAppName("TestJob")
    val sc = createSparkContext(conf)
    val confBroadcast = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
    val rdd: RDD[String] = ??? // your existing rdd
    val filedata_rdd = rdd.map { x => readFromHDFS(confBroadcast.value.value, x) }
  }
  def readFromHDFS(configuration: Configuration, path: String): String = {
    val fs: FileSystem = FileSystem.get(configuration)
    val inputStream = fs.open(new Path(path));
    val writer = new StringWriter();
    IOUtils.copy(inputStream, writer, "UTF-8");
    writer.toString();
  }
}
Run Code Online (Sandbox Code Playgroud)
        |   归档时间:  |  
           
  |  
        
|   查看次数:  |  
           1441 次  |  
        
|   最近记录:  |