San*_*raj 3 java performance apache-spark apache-spark-sql
我正在使用spark 2.11版本,我在我的应用程序中只做了3个基本操作:
但是对于这3个操作,它需要将近20分钟.如果我在SQL中执行相同的操作,则需要不到1分钟.
我已经开始使用spark因为它会产生非常快的结果,但是花费了太多时间.如何提高性能?
第1步:从数据库中获取记录.
Properties connectionProperties = new Properties();
connectionProperties.put("user", "test");
connectionProperties.put("password", "test##");
String query="(SELECT * from items)
dataFileContent= spark.read().jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", query,connectionProperties);
Run Code Online (Sandbox Code Playgroud)
步骤2:使用contains检查文件B(2M)中存在的文件A(5k)的记录
Dataset<Row> NewSet=source.join(target,target.col("ItemIDTarget").contains(source.col("ItemIDSource")),"inner");
Run Code Online (Sandbox Code Playgroud)
步骤3:将匹配的记录写入CSV格式的文件
NewSet.repartition(1).select("*")
.write().format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue", "")
.save(fileAbsolutePath);
Run Code Online (Sandbox Code Playgroud)
为了提高性能,我尝试了一些设置Cache,数据序列化等功能
set("spark.serializer","org.apache.spark.serializer.KryoSerializer")),
Run Code Online (Sandbox Code Playgroud)
随机播放时间
sqlContext.setConf("spark.sql.shuffle.partitions", "10"),
Run Code Online (Sandbox Code Playgroud)
数据结构调整
-XX:+UseCompressedOops ,
Run Code Online (Sandbox Code Playgroud)
没有一种方法不会产生更好的性能.
提高性能更像是改善并行性.
并行性取决于RDD中的分区数.
确保数据集/数据帧/ RDD既没有太多的分区,也没有太少的分区.
请查看以下建议,以便改进代码.我对scala感觉更舒服所以我在scala中提供建议.
步骤1:通过提及numPartitions,确保您可以控制与数据库建立的连接.
连接数=分区数.
下面我只分配了10个num_partitions,你必须调整以获得更多性能.
int num_partitions;
num_partitions = 10;
Properties connectionProperties = new Properties();
connectionProperties.put("user", "test");
connectionProperties.put("password", "test##");
connectionProperties.put("partitionColumn", "hash_code");
String query = "(SELECT mod(A.id,num_partitions) as hash_code, A.* from items A)";
dataFileContent = spark.read()
.jdbc("jdbc:oracle:thin:@//172.20.0.11/devad",
dbtable = query,
columnName = "hash_code",
lowerBound = 0,
upperBound = num_partitions,
numPartitions = num_partitions,
connectionProperties);
Run Code Online (Sandbox Code Playgroud)
第2步:
Dataset<Row> NewSet = source.join(target,
target.col("ItemIDTarget").contains(source.col("ItemIDSource")),
"inner");
Run Code Online (Sandbox Code Playgroud)
由于表/数据帧之一具有5k记录(少量数据),您可以使用如下所述的广播连接.
import org.apache.spark.sql.functions.broadcast
val joined_df = largeTableDF.join(broadcast(smallTableDF), "key")
Run Code Online (Sandbox Code Playgroud)
步骤3:使用coalesce减少分区数量,以避免完全shuffle.
NewSet.coalesce(1).select("*")
.write().format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue", "")
.save(fileAbsolutePath);
Run Code Online (Sandbox Code Playgroud)
希望我的回答可以帮到你.