使用Apache Spark和Java将CSV解析为DataFrame/DataSet

mit*_*hra 19 java hadoop hdfs apache-spark apache-spark-sql

我是新来的火花,我想使用group-by&reduce从CSV中找到以下内容(使用一行):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN
Run Code Online (Sandbox Code Playgroud)

我想通过Department,Designation,State简化包含sum(costToCompany)TotalEmployeeCount的附加列的CSV

应得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000
Run Code Online (Sandbox Code Playgroud)

有没有办法使用转换和动作来实现这一点.或者我们应该进行RDD操作?

eme*_*cas 40

程序

  • 创建一个类(模式)来封装你的结构(它不是方法B所必需的,但如果你使用Java,它会使你的代码更容易阅读)

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 加载CVS(JSON)文件

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
    
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    
    Run Code Online (Sandbox Code Playgroud)

此时您有两种方法:

A. SparkSQL

  • 注册表(使用您定义的Schema类)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
    Run Code Online (Sandbox Code Playgroud)
  • 使用所需的Query-group-by查询表

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
    Run Code Online (Sandbox Code Playgroud)
  • 在这里,您还可以使用SQL方法执行您想要的任何其他查询

B.火花

  • 使用复合密钥映射:Department,Designation,State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    
    Run Code Online (Sandbox Code Playgroud)

    });

  • reduceByKey使用复合键,求和costToCompany列,并按键累计记录数

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    
    Run Code Online (Sandbox Code Playgroud)


mrs*_*vas 20

可以使用Spark内置CSV阅读器解析CSV文件.它将在成功读取文件时返回DataFrame/DataSet.在DataFrame/DataSet之上,您可以轻松应用类似SQL的操作.

使用Spark 2.x(及以上)与Java

创建SparkSession对象aka spark

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();
Run Code Online (Sandbox Code Playgroud)

使用创建行的架构 StructType

import org.apache.spark.sql.types.StructType;

StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");
Run Code Online (Sandbox Code Playgroud)

从CSV文件创建数据框并将模式应用于该文件

Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("hdfs://path/input.csv");
Run Code Online (Sandbox Code Playgroud)

从CSV文件读取数据的更多选项

现在我们可以通过两种方式聚合数据

1. SQL方式

在spark sql metastore中注册表以执行SQL操作

df.createOrReplaceTempView("employee");
Run Code Online (Sandbox Code Playgroud)

在已注册的数据帧上运行SQL查询

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing
Run Code Online (Sandbox Code Playgroud)

我们甚至可以直接在CSV文件上执行SQL,而无需使用Spark SQL创建表


2.对象链接或编程或类似Java的方式

为sql函数执行必要的导入

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;
Run Code Online (Sandbox Code Playgroud)

使用groupByagg数据框/数据集来执行countsum数据

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing
Run Code Online (Sandbox Code Playgroud)

依赖库

"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"
Run Code Online (Sandbox Code Playgroud)