Ade*_*nde 5 elasticsearch apache-spark apache-spark-sql
我正在尝试使用Apache spark在Elasticsearch中查询我的数据,但我的Spark工作大约花了20个小时来进行聚合并仍在运行.ES中的相同查询大约需要6秒.
我知道数据必须从Elasticsearch集群转移到我的spark集群,并且一些数据在Spark中进行混洗.
我的ES索引中的数据是大约.3亿个文档,每个文档有大约400个字段(1.4Terrabyte).
我有一个3节点火花簇(1个主站,2个工作站),总共有60GB内存和8个核心.
运行所需的时间是不可接受的,有没有办法让我的火花作业运行得更快?
这是我的火花配置:
SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
.setMaster("spark://10.0.0.203:7077")
.set("es.nodes", "10.0.0.207")
.set("es.cluster", "wp-es-reporting-prod")
.setJars(JavaSparkContext.jarOfClass(Demo.class))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.default.parallelism", String.valueOf(cpus * 2))
.set("spark.executor.memory", "8g");
Run Code Online (Sandbox Code Playgroud)
编辑
SparkContext sparkCtx = new SparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sparkCtx);
DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "customer-rpts01-201510/sample");
DataFrame dfCleaned = cleanSchema(sqlContext, df);
dfCleaned.registerTempTable("RPT");
DataFrame sqlDFTest = sqlContext.sql("SELECT agent, count(request_type) FROM RPT group by agent");
for (Row row : sqlDFTest.collect()) {
System.out.println(">> " + row);
}
Run Code Online (Sandbox Code Playgroud)
我弄清楚发生了什么,基本上,我试图操纵数据帧模式,因为我有一些带有点的字段,例如user.firstname.这似乎在火花的收集阶段引起问题.要解决这个问题,我不得不重新索引我的数据,因此我的字段不再有点,而是下划线,例如user_firstname.
| 归档时间: |
|
| 查看次数: |
2513 次 |
| 最近记录: |