我正在使用一个大型数据集,该数据集由两列分隔 - plant_name和tag_id.第二个分区 - tag_id有200000个唯一值,我主要通过特定tag_id值访问数据.如果我使用以下Spark命令:
sqlContext.setConf("spark.sql.hive.metastorePartitionPruning", "true")
sqlContext.setConf("spark.sql.parquet.filterPushdown", "true")
val df = sqlContext.sql("select * from tag_data where plant_name='PLANT01' and tag_id='1000'")
Run Code Online (Sandbox Code Playgroud)
我希望快速响应,因为这解析为单个分区.在Hive和Presto中,这需要几秒钟,但在Spark中运行数小时.
实际数据保存在S3存储桶中,当我提交sql查询时,Spark关闭并首先获取Hive Metastore中的所有分区(其中200000个),然后调用refresh()强制所有这些文件的完整状态列表在S3对象库中(实际调用listLeafFilesInParallel).
这两个操作是如此昂贵,是否有任何设置可以让Spark更早地修剪分区 - 在调用元数据存储期间,还是之后立即?
使用scala运行spark工作,正如预期的那样,所有工作都按时完成,但不知何故,一些INFO日志在作业停止前打印20-25分钟.
发布少量UI截图,可以帮助解决问题.
我不明白为什么两个工作ID之间花了这么多时间.
以下是我的代码段:
val sc = new SparkContext(conf)
for (x <- 0 to 10) {
val zz = getFilesList(lin);
val links = zz._1
val path = zz._2
lin = zz._3
val z = sc.textFile(links.mkString(",")).map(t => t.split('\t')).filter(t => t(4) == "xx" && t(6) == "x").map(t => titan2(t)).filter(t => t.length > 35).map(t => ((t(34)), (t(35), t(5), t(32), t(33))))
val way_nodes = sc.textFile(way_source).map(t => t.split(";")).map(t => (t(0), t(1)));
val t = z.join(way_nodes).map(t => (t._2._1._2, Array(Array(t._2._1._2, t._2._1._3, t._2._1._4, t._2._1._1, t._2._2)))).reduceByKey((t, y) …Run Code Online (Sandbox Code Playgroud) Parquet文件包含每块行计数字段.Spark似乎在某些时候读取它(SpecificParquetRecordReaderBase.java#L151).
我试过这个spark-shell:
sqlContext.read.load("x.parquet").count
Run Code Online (Sandbox Code Playgroud)
Spark分为两个阶段,显示了DAG中的各种聚合步骤.我认为这意味着它正常读取文件而不是使用行计数.(我可能是错的.)
问题是:当我运行时Spark是否已经使用行计数字段count?是否有其他API可以使用这些字段?出于某种原因,依赖这些领域是个坏主意吗?
我在amazom aws emr 4.0.0中运行spark 1.4.1
对于一些共振火花saveAsTextFile在emr 4.0.0上与emr 3.8相比非常慢(为5秒,现在为95秒)
实际上saveAsTextFile表示它已经在4.356秒完成,但之后我看到很多INFO消息,com.amazonaws.latency记录器在接下来的90秒内出现404错误
spark> sc.parallelize(List.range(0, 1600000),160).map(x => x + "\t" + "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
2015-09-01 21:16:17,637 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5 (saveAsTextFile at <console>:22) finished in 4.356 s
2015-09-01 21:16:17,637 INFO [task-result-getter-2] cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all completed, from pool
2015-09-01 21:16:17,637 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at <console>:22, took 4.547829 s
2015-09-01 21:16:17,638 INFO [main] s3n.S3NativeFileSystem (S3NativeFileSystem.java:listStatus(896)) - listStatus s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
2015-09-01 …Run Code Online (Sandbox Code Playgroud) 只是想知道Parquet谓词下推是否也适用于S3,而不仅限于HDFS。具体来说,如果我们使用Spark(非EMR)。
进一步的解释可能会有所帮助,因为它可能涉及对分布式文件系统的理解。
我正在使用 Spark Streaming 1.5.2,并使用 Direct Stream 方法从 Kafka 0.8.2.2 中提取数据。
我已启用检查点,以便我的驱动程序可以重新启动并从中断处继续,而不会丢失未处理的数据。
检查点被写入 S3,因为我在 Amazon AWS 上而不是在 Hadoop 集群之上运行。
批处理间隔为 1 秒,因为我想要低延迟。
问题是,将单个检查点写入 S3 需要 1 到 20 秒。它们在内存中备份,最终应用程序失败。
2016-04-28 18:26:55,483 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6071 bytes and 1724 ms
2016-04-28 18:26:58,812 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6024 bytes and 3329 ms
2016-04-28 18:27:00,327 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882408000 …Run Code Online (Sandbox Code Playgroud)