从流文件内容中提取多行内容

pra*_*por 2 hortonworks-data-platform apache-nifi hortonworks-dataflow

我正在从 MySQL 表中导入数据(仅适用于选定的列)并将其放入 HDFS。完成后,我想在 Hive 中创建一个表。

为此,我有一个schema.sql包含整个表的 CREATE TABLE 语句的文件,我只想为我导入的列生成新的 CREATE TABLE 语句。

类似于我grep在下面的示例中所做的事情。

在此处输入图片说明

FetchFile一起使用ExtractText但无法使其工作。如果我将整体模式放入一个属性中,我如何使用 NiFi 处理器甚至表达式语言来实现这一点?

或者有没有更好的方法在导入的数据上创建表?

Shu*_*Shu 5

NiFi 可以根据流文件内容生成 Create table 语句[s]

1.使用ConvertAvroToORC处理器创建ORC表:

  • 如果您将 avro 数据转换为 ORC 格式然后存储到 HDFS,则 ConvertAvroToORC 处理器将hive.ddl属性添加到流文件。

  • PutHDFS 处理器absolute.hdfs.path向流文件添加属性。

  • 我们可以使用这个hive.ddlabsolute.hdfs.path属性并在 HDFS 目录之上动态创建 orc 表。

流动:

 Pull data from source(ExecuteSQL...etc)
  -> ConvertAvroToORC //add Hive DbName,TableName in HiveTableName property value--> 
  -> PutHDFS //store the orc file into HDFS location --> 
  -> ReplaceText //Replace the flowfile content with ${hive.ddl} Location '${absolute.hdfs.path}'--> 
  -> PutHiveQL //execute the create table statement
Run Code Online (Sandbox Code Playgroud)

请参阅链接以了解有关上述流程的更多详细信息。

2.使用 ExtractAvroMetaData 处理器创建 Avro 表:

  • 在 NiFi 中,一旦我们使用 QueryDatabaseTable 提取数据,ExecuteSQL 处理器的数据格式为AVRO

  • 我们可以基于avro 模式(.avsc 文件)创建Avro 表,通过使用ExtractAvroMetaData处理器,我们可以提取模式并保留为流文件属性,然后通过使用此模式我们可以动态创建 AvroTables。

流动:

ExecuteSQL (success)|-> PutHDFS //store data into HDFS
           (success)|-> ExtractAvroMetadata //configure Metadata Keys as avro.schema 
                     -> ReplaceText //replace flowfile content with avro.schema
                     -> PutHDFS //store the avsc file into schema directory
                     -> ReplaceText //create avro table on top of schema directory
                     -> PutHiveQL //execute the hive.ddl
Run Code Online (Sandbox Code Playgroud)

示例AVRO创建表语句:

CREATE TABLE as_avro
  ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
  STORED as INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
  OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
  TBLPROPERTIES (
    'avro.schema.url'='/path/to/the/schema/test_serializer.avsc');
Run Code Online (Sandbox Code Playgroud)

我们将通过在上述流程中使用 ReplaceText 处理器来更改模式 url路径

用另一种方式的ExecuteSQL处理器得到所有创建表的语句(或)列从(SYS.TABLES / INFORMATION_SCHEMA.COLUMNS ..等)的信息从源(如果源系统允许的话)和编写脚本来map the data types进入hive appropriate types,然后他们在你存储desired format在蜂房.

编辑:

grep对流文件内容运行命令,我们需要使用ExecuteStreamCommand处理器

电调配置:

在此处输入图片说明

然后将output stream关系提供给 ExtractText Processor

ET 配置:

添加新属性为

内容

(?s)(.*)
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

然后content attribute添加到流文件中,您可以使用该属性并准备创建表语句。