Elasticsearch + Apache Spark性能

Ade*_*nde 5 elasticsearch apache-spark apache-spark-sql

我正在尝试使用Apache spark在Elasticsearch中查询我的数据,但我的Spark工作大约花了20个小时来进行聚合并仍在运行.ES中的相同查询大约需要6秒.

我知道数据必须从Elasticsearch集群转移到我的spark集群,并且一些数据在Spark中进行混洗.

我的ES索引中的数据是大约.3亿个文档,每个文档有大约400个字段(1.4Terrabyte).

我有一个3节点火花簇(1个主站,2个工作站),总共有60GB内存和8个核心.

运行所需的时间是不可接受的,有没有办法让我的火花作业运行得更快?

这是我的火花配置:

SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
                 .setMaster("spark://10.0.0.203:7077")    
                 .set("es.nodes", "10.0.0.207")
                 .set("es.cluster", "wp-es-reporting-prod")              
                .setJars(JavaSparkContext.jarOfClass(Demo.class))
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.default.parallelism", String.valueOf(cpus * 2))
                .set("spark.executor.memory", "8g");
Run Code Online (Sandbox Code Playgroud)

编辑

    SparkContext sparkCtx = new SparkContext(sparkConf);

    SQLContext sqlContext = new SQLContext(sparkCtx);
    DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "customer-rpts01-201510/sample");

    DataFrame dfCleaned = cleanSchema(sqlContext, df);

    dfCleaned.registerTempTable("RPT");

    DataFrame sqlDFTest = sqlContext.sql("SELECT agent, count(request_type) FROM RPT group by agent");

    for (Row row : sqlDFTest.collect()) {
        System.out.println(">> " + row);
    }
Run Code Online (Sandbox Code Playgroud)

Ade*_*nde 5

我弄清楚发生了什么,基本上,我试图操纵数据帧模式,因为我有一些带有点的字段,例如user.firstname.这似乎在火花的收集阶段引起问题.要解决这个问题,我不得不重新索引我的数据,因此我的字段不再有点,而是下划线,例如user_firstname.