mar*_*ark 10 hive amazon-s3 amazon-web-services amazon-athena
我将传感器数据存储在S3中(每5分钟写入一次数据):
farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443
Run Code Online (Sandbox Code Playgroud)
1541252701443是一个包含度量的json文件:
{ "temperature": 14.78, "pressure": 961.70, "humidity": 68.32}
Run Code Online (Sandbox Code Playgroud)
我肯定错过了一些蜂巢技能.不幸的是,我没有找到一个提取我的时间序列json数据的例子.我也不确定Hive/Athena确实支持这种数据摔跤.
我正在为这个数据创建一个Athena表格...
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
device string,
sensor string,
data_point string,
value double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/farm0001/sensor01/'
PARTITIONED BY (timestamp string)
TBLPROPERTIES ('has_encrypted_data'='false')
Run Code Online (Sandbox Code Playgroud)
我想的另一条道路是将数据存储在一个更容易处理的结构中/也许我没有足够的数据分区??!
所以也许我应该像这样添加dt到结构:
farm_iot/sensor_data/2018-11-03-02-45-02/farm/farm0001/sensor01/1541252701443
Run Code Online (Sandbox Code Playgroud)
仍然没有让我到达我想要的地方:
+---------------+----------+----------+-------------+--------+
| timestamp | device | sensor | data_point | value |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | temperature | 14.78 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | humidity | 68.32 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | pressure | 961.70 |
+---------------+----------+----------+-------------+--------+
Run Code Online (Sandbox Code Playgroud)
任何指向这个目标的指针都将非常感激.谢谢!
请注意:我不想使用胶水,也不想了解如何手动操作.除了胶水已经创建〜16.000表昨天:)
让我尝试解释我前面看到的一些问题。
partitionname=partitionvalue不是,这不是强制性的,但是如果您想使用一些命令来根据文件夹结构自动添加分区,则此设置很有用。如果您主要按传感器或设备进行查询,这就是我可以解决您的问题的方式
理想情况下,您的文件夹结构应来自
farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443
Run Code Online (Sandbox Code Playgroud)
到farm_iot / sensor_data / farm / device = farm0001 / sensor = sensor01 / 1541252701443
您的表定义应包含您的分区位置,以便能够在不使用正则表达式的情况下选择它并利用它的性能提高(我想一个常见的查询将按设备或传感器进行过滤。此外,您还需要添加所有json列那是你文件的一部分
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
temperature double,
preassure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')
Run Code Online (Sandbox Code Playgroud)
我们缺少时间戳,它实际上是json输入文件名的一部分。我们可以在选择语句中使用虚拟列包括文件名INPUT__FILE__NAME,如下所示
select device, sensor, temperature, preassure, humidity, INPUT__FILE__NAME as mytimestamp from farm.sensor_data
Run Code Online (Sandbox Code Playgroud)
如果要确定压力,温度和湿度以及不同的行,我建议使用这三个数组创建一个数组并将其分解,使用UNION ALL运行3个查询以附加结果应该会非常高效
如果遵循Hive约定,则在包含新设备/传感器后,可以利用命令msck repair table自动添加新分区。在最坏的情况下,如果要保留文件夹结构,可以按以下方式添加分区
ALTER TABLE test ADD PARTITION (device='farm0001', sensor='sensor01') location 's3://farm_iot/sensor_data/farm/farm0001/sensor01'
Run Code Online (Sandbox Code Playgroud)
注意:新分区不会自动添加,您始终需要添加它们
我尝试添加尽可能多的细节。如果不清楚,请告诉我。
编辑:如果您的查询将主要基于时间序列(例如日期范围),我建议在天级别(不小于此)添加一个分区,以提高查询的性能。所以你的表定义看起来像
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
temperature double,
preassure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (dt=long, device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')
Run Code Online (Sandbox Code Playgroud)
您的文件夹结构看起来像
farm_iot / sensor_data / farm / dt = 20191204 / device = farm0001 / sensor = sensor01 / 1541252701443
为了澄清起见,您不需要为每个新分区修改表,只需将此分区添加到表中,这实质上就是Hive知道创建新分区的方式。如果您决定使用分区,这是唯一的方法,如果不这样做(这会影响性能),那么还有其他一些选择可以使分区工作
编辑2:
如果您想保持数据结构不变并且不使用分区,则可以按以下方式获得预期结果
CREATE EXTERNAL TABLE IF NOT EXISTS yourdb.sensordata (
temperature double,
pressure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');
SET hive.mapred.supports.subdirectories=TRUE;
SET mapred.input.dir.recursive=TRUE;
select * from yourdb.sensordata;
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'temperature' as data_point,
temperature as value
from yourdb.sensordata
union all
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'pressure' as data_point,
pressure as value
from yourdb.sensordata
union all
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'humidity' as data_point,
humidity as value
from yourdb.sensordata;
Run Code Online (Sandbox Code Playgroud)
如您所见,我从文件路径中获取了最多的信息,但是需要设置一些标志来递归地告诉Hive读取文件夹
ts,device,sensor,_data_point,value
1541252701443,farm0001,sensor01,temperature,14.78
1541252701443,farm0001,sensor01,pressure,961.7
1541252701443,farm0001,sensor01,humidity,68.32
Run Code Online (Sandbox Code Playgroud)
首先非常感谢@hlagos 的帮助。
AWS Athena 无法按照我需要的方式转换 json 传感器数据(我们在对 @hlagos 回答的评论中讨论了这一点)。因此,处理这种情况的“最简单”方法是将数据格式从 json 更改为 CSV 以更接近我需要的格式。
我现在将传感器数据以 CSV 格式存储在 S3 中(每 5 分钟写入一次数据),并添加了我们讨论的日期和设备分区。
结果文件夹结构:
farm_iot/sensor_data/farm/day=20181129/device=farm0001/1543535738493
Run Code Online (Sandbox Code Playgroud)
CSV文件的数据内容:
sensor01,temperature,2.82
sensor01,pressure,952.83
sensor01,humidity,83.64
sensor02,temperature,2.61
sensor02,pressure,952.74
sensor02,humidity,82.41
Run Code Online (Sandbox Code Playgroud)
AWS Athena 表定义:
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
`sensor` string,
`data_point` string,
`value` double
)
PARTITIONED BY (day string, device string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\\'
LINES TERMINATED BY '\n'
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');
Run Code Online (Sandbox Code Playgroud)
我添加的分区是这样的(稍后我会有一个脚本来提前创建分区):
msck repair table farm.sensor_data
Run Code Online (Sandbox Code Playgroud)
现在我可以查询数据:
select regexp_extract("$path", '[^/]+$') as timestamp, device, sensor,
data_point, value from farm.sensor_data where day='20181104'
Results
timestamp device sensor data_point value
1 1541310040278 farm0001 sensor01 temperature 21.61
2 1541310040278 farm0001 sensor01 pressure 643.65
3 1541310040278 farm0001 sensor01 humidity 74.84
4 1541310040278 farm0001 sensor02 temperature 9.14
5 1541310040278 farm0001 sensor02 pressure 956.04
6 1541310040278 farm0001 sensor02 humidity 88.01
7 1541311840309 farm0001 sensor01 temperature 21.61
8 ...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3419 次 |
| 最近记录: |