我正在尝试编写一个parquet文件来Amazon S3使用Spark 1.6.1.parquet我正在生成的小部分~2GB曾经被写过,所以数据并不多.我试图证明Spark我可以使用的平台.
基本上,我什么都正在建立一个star schema与dataframes,那么我会写这些表出来拼花地板.数据来自供应商提供的csv文件,我使用Spark作为ETL平台.我现在有一个3节点集群中的ec2(r3.2xlarge)那么120GB的存储器上执行程序和16个内核总.
输入文件总共大约22GB,我现在提取大约2GB的数据.最后,当我开始加载完整数据集时,这将是几TB.
这是我的火花/斯卡拉pseudocode:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), …Run Code Online (Sandbox Code Playgroud)