Ame*_*osa 11 hadoop hive impala apache-spark parquet
我正在使用Hadoop/Spark进行一些信号分析,我需要有关如何构建整个过程的帮助.
信号现在存储在数据库中,我们将使用Sqoop读取,并将在HDFS上的文件中进行转换,其架构类似于:
<Measure ID> <Source ID> <Measure timestamp> <Signal values>
Run Code Online (Sandbox Code Playgroud)
其中信号值只是由浮点逗号分隔的数字组成的字符串.
000123 S001 2015/04/22T10:00:00.000Z 0.0,1.0,200.0,30.0 ... 100.0
000124 S001 2015/04/22T10:05:23.245Z 0.0,4.0,250.0,35.0 ... 10.0
...
000126 S003 2015/04/22T16:00:00.034Z 0.0,0.0,200.0,00.0 ... 600.0
Run Code Online (Sandbox Code Playgroud)
我们想写交互/批量查询到:
在信号值上应用聚合函数
SELECT *
FROM SIGNALS
WHERE MAX(VALUES) > 1000.0
Run Code Online (Sandbox Code Playgroud)
选择峰值超过1000.0的信号.
将聚合应用于聚合
SELECT SOURCEID, MAX(VALUES)
FROM SIGNALS
GROUP BY SOURCEID
HAVING MAX(MAX(VALUES)) > 1500.0
Run Code Online (Sandbox Code Playgroud)
选择至少具有超过1500.0的单个信号的源.
在样本上应用用户定义的函数
SELECT *
FROM SIGNALS
WHERE MAX(LOW_BAND_FILTER("5.0 KHz", VALUES)) > 100.0)
Run Code Online (Sandbox Code Playgroud)
选择在5.0 KHz过滤后至少具有超过100.0的值的信号.
我们需要一些帮助才能:
非常感谢你!
1)Parquet作为柱状格式适用于OLAP.Parquet的Spark支持已经足够成熟,可供生产使用.我建议将表示信号值的字符串解析为以下数据结构(简化):
case class Data(id: Long, signals: Array[Double])
val df = sqlContext.createDataFrame(Seq(Data(1L, Array(1.0, 1.0, 2.0)), Data(2L, Array(3.0, 5.0)), Data(2L, Array(1.5, 7.0, 8.0))))
Run Code Online (Sandbox Code Playgroud)
保持double数组允许定义和使用这样的UDF:
def maxV(arr: mutable.WrappedArray[Double]) = arr.max
sqlContext.udf.register("maxVal", maxV _)
df.registerTempTable("table")
sqlContext.sql("select * from table where maxVal(signals) > 2.1").show()
+---+---------------+
| id| signals|
+---+---------------+
| 2| [3.0, 5.0]|
| 2|[1.5, 7.0, 8.0]|
+---+---------------+
sqlContext.sql("select id, max(maxVal(signals)) as maxSignal from table group by id having maxSignal > 1.5").show()
+---+---------+
| id|maxSignal|
+---+---------+
| 1| 2.0|
| 2| 8.0|
+---+---------+
Run Code Online (Sandbox Code Playgroud)
或者,如果你想要一些类型安全,使用Scala DSL:
import org.apache.spark.sql.functions._
val maxVal = udf(maxV _)
df.select("*").where(maxVal($"signals") > 2.1).show()
df.select($"id", maxVal($"signals") as "maxSignal").groupBy($"id").agg(max($"maxSignal")).where(max($"maxSignal") > 2.1).show()
+---+--------------+
| id|max(maxSignal)|
+---+--------------+
| 2| 8.0|
+---+--------------+
Run Code Online (Sandbox Code Playgroud)
2)这取决于:如果您的数据大小允许在查询时间内以合理的延迟进行所有处理 - 那就去做吧.您可以从这种方法开始,然后为慢速/热门查询构建优化的结构
3)Hive很慢,Impala和Spark SQL已经过时了.有时选择并不容易,我们使用经验法则:如果所有数据都存储在HDFS/Hive中,Impala适用于没有连接的查询,Spark具有更大的延迟但连接可靠,它支持更多数据源并具有丰富的非SQL处理功能(如MLlib和GraphX)
4)保持简单:存储原始数据(主数据集)去重复和分区(我们使用基于时间的分区).如果新数据到达分区并且您已经生成了下游数据集 - 请重新启动此分区的管道.
希望这可以帮助
归档时间: |
|
查看次数: |
724 次 |
最近记录: |