使用Spark通过s3a将镶木地板文件写入s3非常慢

Bru*_*s35 18 scala amazon-s3 apache-spark parquet apache-spark-sql

我正在尝试编写一个parquet文件来Amazon S3使用Spark 1.6.1.parquet我正在生成的小部分~2GB曾经被写过,所以数据并不多.我试图证明Spark我可以使用的平台.

基本上,我什么都正在建立一个star schemadataframes,那么我会写这些表出来拼花地板.数据来自供应商提供的csv文件,我使用Spark作为ETL平台.我现在有一个3节点集群中的ec2(r3.2xlarge)那么120GB的存储器上执行程序和16个内核总.

输入文件总共大约22GB,我现在提取大约2GB的数据.最后,当我开始加载完整数据集时,这将是几TB.

这是我的火花/斯卡拉pseudocode:

  def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }
Run Code Online (Sandbox Code Playgroud)

465884512行的计数大约需要2分钟.对拼花地板的写作需要38分钟

我明白coalesce这对写作的司机来说是一种洗牌......但是它所花费的时间让我觉得我在做一些严重错误的事情.没有coalesce,这仍然需要15分钟,IMO仍然太长,给了我一大堆小parquet文件.我想每天有一个大文件,我将拥有.我有代码按字段值进行分区,而且速度也很慢.我也尝试输出这个csv,大约需要1个小时.

另外,当我提交工作时,我并没有真正设置运行时道具.我的一项工作的控制台统计信息是:

  • 活着的工人:2
  • 使用的核心数:16总计,16使用
  • 使用的内存:总计117.5 GB,使用107.5 GB
  • 应用程序:1个运行,5个完成
  • 驱动程序:0运行,0完成
  • 状态:活着

Dav*_*vid 18

在I/O操作期间,Spark默认值会导致大量(可能)不必要的开销,尤其是在写入S3时.本文将对此进行更全面的讨论,但您有两种设置需要考虑更改.

  • 使用DirectParquetOutputCommitter.默认情况下,Spark会将所有数据保存到临时文件夹,然后再移动这些文件.使用DirectParquetOutputCommitter可以通过直接写入S3输出路径来节省时间

    • Spark 2.0+不再提供
      • 正如jira票中所述,目前的解决方案是
        1. 将代码切换为使用s3a和Hadoop 2.7.2+; 它全面改善,在Hadoop 2.8中变得更好,并且是s3guard的基础
        2. 使用Hadoop FileOutputCommitter并将mapreduce.fileoutputcommitter.algorithm.version设置为2

    默认情况下,从Spark 1.5开始 关闭模式合并关闭模式合并.如果启用了架构合并,则驱动程序节点将扫描所有文件以确保一致的架构.这特别昂贵,因为它不是分布式操作.确保通过执行此操作来关闭此功能

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

  • 从Spark 2.0开始,DirectParquetOutputCommitter不再可用.有关新解决方案,请参阅[SPARK-10063](https://issues.apache.org/jira/browse/SPARK-10063) (2认同)
  • 如果性能几乎相同,这是否意味着它不是一个真正有效的解决方案? (2认同)
  • 默认情况下,合并模式也是假的1.5.0 http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging (2认同)