Pia*_*iar 4 cql data-integration cassandra apache-nifi
我每天都有多次传入各种CSV文件,存储来自传感器的时间序列数据,传感器是传感器站的一部分.每个CSV都以它所来自的传感器站和传感器ID命名,例如"station1_sensor2.csv".目前,数据存储如下:
> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;
Run Code Online (Sandbox Code Playgroud)
我创建了一个Cassandra表来存储它们,并能够查询它们以查找各种已识别的任务.Cassandra表看起来像这样:
cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
CREATE TABLE sensor_data (
station_id text, // id of the station
sensor_id text, // id of the sensor
tps timestamp, // timestamp of the measure
val float, // measured value
PRIMARY KEY ((station_id, sensor_id), tps)
);
Run Code Online (Sandbox Code Playgroud)
我想使用Apache Nifi自动将CSV中的数据存储到此Cassandra表中,但我找不到示例或方案来正确执行.我曾尝试使用"PutCassandraQL"处理器,但我在没有任何明确的例子的情况下苦苦挣扎.所以,任何帮助如何执行Cassandra放置查询与Apache Nifi将数据插入表中将不胜感激!
TL; DR我有一个NiFi 1.0模板,可以在Gist和NiFi Wiki上完成.
NiFi鼓励非常模块化的设计,所以让我们把它分解成更小的任务,我将描述一个可能的流程并解释每个处理器在你的用例方面的用途:
读入CSV文件.这可以使用GetFile完成,或者最好使用ListFile - > FetchFile.在我的示例中,我使用脚本处理器来在线创建流文件,其中包含上面的示例数据.这使我的模板可以移植供其他人使用.
解析文件名以获取工作站和传感器字段.这使用NiFi表达式语言在下划线(对于工作站)和传感器的下划线(减去CSV扩展名)之前获取文件名的部分.
将单个CSV流文件拆分为每行一个流文件.这样做我们可以稍后创建单独的CQL INSERT语句.
从每一行中提取列值.我使用了ExtractText和一个正则表达式,如果你有非常复杂的逻辑,你可能想要检查脚本处理器,如ExecuteScript.
改变时间戳.IIRC,CQL不接受时间戳文字上的微秒.您可以尝试解析微秒(最好在ExecuteScript处理器中完成),也可以只重新格式化时间戳.请注意,"重新格式化",因为无法解析微秒,导致在我的示例中截断所有小数秒.
构建CQL INSERT语句.此时数据(无论如何在我的模板中)都是流文件属性,原始内容可以用CQL INSERT语句替换(这是PutCassandraQL期望它的方式).您可以将数据保存在属性中(使用UpdateAttribute正确命名它们,请参阅PutCassandraQL文档)并使用预准备语句,但恕我直言,编写显式CQL语句更简单.在撰写本文时,PutCassandraQL并未缓存PreparedStatements,因此现在以这种方式执行操作实际上效率较低.
使用PutCassandraQL执行CQL语句.
我没有详细说明我的属性名称等等,但是当流程到达ReplaceText时,我有以下属性:
ReplaceText将内容设置为以下内容(使用表达式语言填写值):
insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})
Run Code Online (Sandbox Code Playgroud)
希望有所帮助,如果您有任何问题或疑问,请告诉我.干杯!
归档时间: |
|
查看次数: |
3147 次 |
最近记录: |