如何加速Spark SQL单元测试?

Clé*_*IEU 11 testing unit-testing apache-spark apache-spark-sql

我正在评估Spark SQL以实现一个简单的报告模块(对已经存储在HDFS上的Avro数据进行的简单聚合很少).我毫不怀疑Spark SQL可以很好地适应我的功能和非功能需求.

但是,除了生产要求之外,我还要确保模块可以测试.我们遵循BDD方法,采用非常集中的方案,这意味着该模块将需要对一些非常简单的数据(1..10条记录)运行数十/数百个SQL查询.

为了大致了解我在本地模式下可以从Spark SQL中获得的性能,我已经快速制作了一些测试原型:

  1. select count(*) from myTable
  2. select key, count(*) from myTable group by key

第一次测试平均需要100ms,但第二次需要500ms.这样的性能是不可接受的,因为它会使测试套件太慢.

为了比较,我可以使用Crunch及其MemPipeline(在本地模式下使用MRPipeline为1500ms)以及在嵌入模式下使用Hive进行1500ms运行相同的测试.因此,Spark SQL在本地模式下比MR快一点,但仍然可以减慢构建良好的测试套件的速度.

是否可以在本地模式下加速Spark SQL?

有没有更好/更快的方法来测试Spark SQL模块?

(我还没有对执行情况进行分析,但由于groupBy().countByKey()RDD平均需要40ms,我希望发现罪魁祸首是查询优化器)


我的快速和肮脏的测试代码如下:

  SparkConf sparkConf = new SparkConf()
                .setMaster("local[4]")
                .setAppName("poc-sparksql");

  try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
        SQLContext sqlCtx = new SQLContext(ctx);

        for (int i = 0; i < ITERATIONS; i++) {
            Stopwatch testCaseSw = new Stopwatch().start();

            DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
            df.registerTempTable("myTable");
            DataFrame result = sqlCtx.sql("select count(*) from myTable");

            System.out.println("Results: " + result.collectAsList());
            System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
        }

        for (int i = 0; i < ITERATIONS; i++) {
            Stopwatch testCaseSw = new Stopwatch().start();

            DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
            df.registerTempTable("myTable");
            DataFrame result = sqlCtx.sql("select a, count(*) from myTable group by a ");

            System.out.println("Results: " + result.collectAsList());
            System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
        }
 }
Run Code Online (Sandbox Code Playgroud)

Bel*_*wal 5

当数据量非常小时,启动太多任务并不是一个好的选择。
在您的第二个选项中,group by将创建另一个stage200 tasks因为您没有设置随机分区属性,并且默认情况下它是,200并且其中大多数将为空。

它在单个测试中可能不会产生影响,但当您有数千个带有随机操作的测试时,可能会产生重大影响。

在 Spark conf 中设置"spark.sql.shuffle.partitions"为) 。x (where x is local[x]

实际上,您不需要4 executors处理少于 10 条记录,因此最好将执行程序的数量减少到1并设置shuffle.paritions1


Kra*_*tam 0

如果您正在研究 ms 级别的优化,那么有各种指针。

  1. 读取一次数据并缓存,并多次对其进行 SQL 查询。在循环内部 load 意味着“在 everyIteartion 中生成新任务”
 DataFrame df = sqlCtx.load("/tmp/test.avro","com.databricks.spark.avro");
 df.registerTempTable("myTable");  
 df.cache()

 for (int i = 0; i < ITERATIONS; i++) {
       Stopwatch testCaseSw = new Stopwatch().start();
       DataFrame result = sqlCtx.sql("select count(*) from myTable");
       // Dont do printLn inside the loop , save the output in some hashMap and print it later once the loop is complete
       System.out.println("Results: " + result.collectAsList());
       System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
}
Run Code Online (Sandbox Code Playgroud)
  1. 将 System.out.println 提取到循环之外,因为它需要一些时间。

请看一下: http ://bytepadding.com/big-data/spark/understanding-spark-through-map-reduce/