Apache Nifi/Cassandra - 如何将CSV加载到Cassandra表中

Pia*_*iar 4 cql data-integration cassandra apache-nifi

我每天都有多次传入各种CSV文件,存储来自传感器的时间序列数据,传感器是传感器站的一部分.每个CSV都以它所来自的传感器站和传感器ID命名,例如"station1_sensor2.csv".目前,数据存储如下:

> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;
Run Code Online (Sandbox Code Playgroud)

我创建了一个Cassandra表来存储它们,并能够查询它们以查找各种已识别的任务.Cassandra表看起来像这样:

cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

        CREATE TABLE sensor_data (
        station_id text, // id of the station
        sensor_id text,  // id of the sensor
        tps timestamp,   // timestamp of the measure
        val float,       // measured value
        PRIMARY KEY ((station_id, sensor_id), tps)
        );
Run Code Online (Sandbox Code Playgroud)

我想使用Apache Nifi自动将CSV中的数据存储到此Cassandra表中,但我找不到示例或方案来正确执行.我曾尝试使用"PutCassandraQL"处理器,但我在没有任何明确的例子的情况下苦苦挣扎.所以,任何帮助如何执行Cassandra放置查询与Apache Nifi将数据插入表中将不胜感激!

mat*_*tyb 5

TL; DR我有一个NiFi 1.0模板,可以在GistNiFi Wiki上完成.

NiFi鼓励非常模块化的设计,所以让我们把它分解成更小的任务,我将描述一个可能的流程并解释每个处理器在你的用例方面的用途:

在此输入图像描述

  1. 读入CSV文件.这可以使用GetFile完成,或者最好使用ListFile - > FetchFile.在我的示例中,我使用脚本处理器来在线创建流文件,其中包含上面的示例数据.这使我的模板可以移植供其他人使用.

  2. 解析文件名以获取工作站和传感器字段.这使用NiFi表达式语言在下划线(对于工作站)和传感器的下划线(减去CSV扩展名)之前获取文件名的部分.

  3. 将单个CSV流文件拆分为每行一个流文件.这样做我们可以稍后创建单独的CQL INSERT语句.

  4. 从每一行中提取列值.我使用了ExtractText和一个正则表达式,如果你有非常复杂的逻辑,你可能想要检查脚本处理器,如ExecuteScript.

  5. 改变时间戳.IIRC,CQL不接受时间戳文字上的微秒.您可以尝试解析微秒(最好在ExecuteScript处理器中完成),也可以只重新格式化时间戳.请注意,"重新格式化",因为无法解析微秒,导致在我的示例中截断所有小数秒.

  6. 构建CQL INSERT语句.此时数据(无论如何在我的模板中)都是流文件属性,原始内容可以用CQL INSERT语句替换(这是PutCassandraQL期望它的方式).您可以将数据保存在属性中(使用UpdateAttribute正确命名它们,请参阅PutCassandraQL文档)并使用预准备语句,但恕我直言,编写显式CQL语句更简单.在撰写本文时,PutCassandraQL并未缓存PreparedStatements,因此现在以这种方式执行操作实际上效率较低.

  7. 使用PutCassandraQL执行CQL语句.

我没有详细说明我的属性名称等等,但是当流程到达ReplaceText时,我有以下属性:

  • station.name:包含从文件名解析的站的名称
  • sensor.name:包含从文件名解析的传感器的名称
  • tps:包含更新的时间戳值
  • columns.2:包含(大概)传感器读数的值

ReplaceText将内容设置为以下内容(使用表达式语言填写值):

insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})
Run Code Online (Sandbox Code Playgroud)

希望有所帮助,如果您有任何问题或疑问,请告诉我.干杯!