spark java中的性能问题

San*_*raj 3 java performance apache-spark apache-spark-sql

我正在使用spark 2.11版本,我在我的应用程序中只做了3个基本操作:

  1. 记录数据库:220万
  2. 使用contains检查数据库(220万)中存在的文件(5 000)中的记录
  3. 将匹配的记录写入CSV格式的文件

但是对于这3个操作,它需要将近20分钟.如果我在SQL中执行相同的操作,则需要不到1分钟.

我已经开始使用spark因为它会产生非常快的结果,但是花费了太多时间.如何提高性能?

第1步:从数据库中获取记录.

        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "test");
        connectionProperties.put("password", "test##");
        String query="(SELECT * from items)
        dataFileContent= spark.read().jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", query,connectionProperties);
Run Code Online (Sandbox Code Playgroud)

步骤2:使用contains检查文件B(2M)中存在的文件A(5k)的记录

Dataset<Row> NewSet=source.join(target,target.col("ItemIDTarget").contains(source.col("ItemIDSource")),"inner");
Run Code Online (Sandbox Code Playgroud)

步骤3:将匹配的记录写入CSV格式的文件

 NewSet.repartition(1).select("*")
        .write().format("com.databricks.spark.csv")
        .option("delimiter", ",")
        .option("header", "true")
        .option("treatEmptyValuesAsNulls", "true")  
        .option("nullValue", "")  
        .save(fileAbsolutePath);
Run Code Online (Sandbox Code Playgroud)

为了提高性能,我尝试了一些设置Cache,数据序列化等功能

set("spark.serializer","org.apache.spark.serializer.KryoSerializer")),
Run Code Online (Sandbox Code Playgroud)

随机播放时间

sqlContext.setConf("spark.sql.shuffle.partitions", "10"),
Run Code Online (Sandbox Code Playgroud)

数据结构调整

-XX:+UseCompressedOops ,
Run Code Online (Sandbox Code Playgroud)

没有一种方法不会产生更好的性能.

Arv*_*ula 5

提高性能更像是改善并行性.

并行性取决于RDD中的分区数.

确保数据集/数据帧/ RDD既没有太多的分区,也没有太少的分区.

请查看以下建议,以便改进代码.我对scala感觉更舒服所以我在scala中提供建议.

步骤1:通过提及numPartitions,确保您可以控制与数据库建立的连接.

连接数=分区数.

下面我只分配了10个num_partitions,你必须调整以获得更多性能.

  int num_partitions;
  num_partitions = 10;
  Properties connectionProperties = new Properties();
  connectionProperties.put("user", "test");
  connectionProperties.put("password", "test##");
  connectionProperties.put("partitionColumn", "hash_code");
  String query = "(SELECT  mod(A.id,num_partitions)  as hash_code, A.* from items A)";
  dataFileContent = spark.read()
    .jdbc("jdbc:oracle:thin:@//172.20.0.11/devad",
      dbtable = query,
      columnName = "hash_code",
      lowerBound = 0,
      upperBound = num_partitions,
      numPartitions = num_partitions,
      connectionProperties);
Run Code Online (Sandbox Code Playgroud)

您可以检查numPartition的工作原理

第2步:

  Dataset<Row> NewSet = source.join(target,
    target.col("ItemIDTarget").contains(source.col("ItemIDSource")),
    "inner");
Run Code Online (Sandbox Code Playgroud)

由于表/数据帧之一具有5k记录(少量数据),您可以使用如下所述的广播连接.

import org.apache.spark.sql.functions.broadcast
val joined_df = largeTableDF.join(broadcast(smallTableDF), "key")
Run Code Online (Sandbox Code Playgroud)

步骤3:使用coalesce减少分区数量,以避免完全shuffle.

NewSet.coalesce(1).select("*")
        .write().format("com.databricks.spark.csv")
        .option("delimiter", ",")
        .option("header", "true")
        .option("treatEmptyValuesAsNulls", "true")  
        .option("nullValue", "")  
        .save(fileAbsolutePath);
Run Code Online (Sandbox Code Playgroud)

希望我的回答可以帮到你.