我想在Kylo中创建一个新的Data Ingest类型的feed,用于将数据从外部数据库导入Hive.在Feed向导的第二步,有一个Source Database Connection的下拉列表,但我的数据库没有列出.如何将我的数据库添加到此列表中?
我只是使用querycassandra处理器查询cassandra表,但我不理解的是如何将我的Json输出文件作为输入文件传递给ExecutePyspark处理器,稍后我需要将我的Spark输出数据传递给Hive.请帮帮我,谢谢.
我的查询Cassandra属性:
我有一个像JSON
{
"campaign_key": 316,
"client_key": 127,
"cpn_mid_counter": "24",
"cpn_name": "Bopal",
"cpn_status": "Active",
"clt_name": "Bopal Ventures",
"clt_status": "Active"
}
Run Code Online (Sandbox Code Playgroud)
预期产量
第一个JSON:
{
"campaign_key": 316,
"client_key": 127,
"cpn_mid_counter": "24",
"cpn_name": "Bopal",
"cpn_status": "Active"
}
Run Code Online (Sandbox Code Playgroud)
第二个JSON:
{
"clt_name": "Bopal Ventures",
"clt_status": "Active"
}
Run Code Online (Sandbox Code Playgroud)
如何使用NIFI来实现?谢谢。
我有一个来自同一个处理器的 3 个流文件。
FF1 -> {a:1,b:2,c:'name'}
FF2 -> {a:1,b:5,c:'水果'}
FF3 -> {a:2,b:3,c:'abc'}
通过使用 MergeContent Processor,我能够合并所有流文件,但我的要求是在 Key 上合并流文件。
如果我使用键“a”加入,则预期输出:
FF1 -> [{a:1,b:2,c:'name'},{a:1,b:5,c:'fruit'}]
FF2 -> [{a:2,b:3,c:'abc'}]
我正在从第三方获得CSV文件。该文件的模式是动态的,我唯一可以确定的是,
现在要在我的系统中使用这种类型的数据,我正在考虑将MongoDB用作登台区域。由于没有 从一次加载到另一次加载,列的数量,列的顺序或列的名称都不恒定。我认为MongoDB将成为一个很好的过渡区域。
我读了有关ConvertRecord处理器的信息,该处理器非常适合CSV到JSON转换器,但没有架构。我只希望每行都作为一个文档,标题名称作为键,值作为值。
我应该怎么做?另外,该文件的大小约为25-30 GB,因此我不想降低系统的性能。
我想到了用我自己的处理器(用Java)进行处理,并且能够得到所需的内容,但是这似乎花费了太多时间,而且看起来并不是最佳选择。
让我知道,这是否可以通过现有处理器实现?
谢谢,Rakesh
更新日期:2018/09/05
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template encoding-version="1.2"><description></description><groupId>a2bd0551-0165-1000-7c6a-a32ca4db047c</groupId><name>csv_to_json_no_schema_v1</name><snippet><connections><id>91bc4a66-704c-3a2f-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>bb6c25ae-f2b6-386a-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>eb6cd54a-e1f1-3871-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>ad804e3c-f233-3556-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>64b15a56-8a5f-3297-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>invalid</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>bb6c25ae-f2b6-386a-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>c30bd123-c436-36ce-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>8a0e37da-acd2-3d72-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>valid</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>bb6c25ae-f2b6-386a-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>247d2139-26b7-31fe-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>1297bea9-b30f-3f45-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>failure</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>8a0e37da-acd2-3d72-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>45e5403f-99f7-3ddf-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>9f8f32f7-130c-35bd-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>8a0e37da-acd2-3d72-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><controllerServices><id>88b0195a-34b2-34f0-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><bundle><artifact>nifi-record-serialization-services-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><comments></comments><descriptors><entry><key>Schema Write Strategy</key><value><name>Schema Write Strategy</name></value></entry><entry><key>schema-access-strategy</key><value><name>schema-access-strategy</name></value></entry><entry><key>schema-registry</key><value><identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService><name>schema-registry</name></value></entry><entry><key>schema-name</key><value><name>schema-name</name></value></entry><entry><key>schema-version</key><value><name>schema-version</name></value></entry><entry><key>schema-branch</key><value><name>schema-branch</name></value></entry><entry><key>schema-text</key><value><name>schema-text</name></value></entry><entry><key>Date Format</key><value><name>Date Format</name></value></entry><entry><key>Time Format</key><value><name>Time Format</name></value></entry><entry><key>Timestamp Format</key><value><name>Timestamp Format</name></value></entry><entry><key>Pretty Print JSON</key><value><name>Pretty Print JSON</name></value></entry><entry><key>suppress-nulls</key><value><name>suppress-nulls</name></value></entry></descriptors><name>JsonRecordSetWriter</name><persistsState>false</persistsState><properties><entry><key>Schema Write Strategy</key><value>no-schema</value></entry><entry><key>schema-access-strategy</key></entry><entry><key>schema-registry</key></entry><entry><key>schema-name</key></entry><entry><key>schema-version</key></entry><entry><key>schema-branch</key></entry><entry><key>schema-text</key></entry><entry><key>Date Format</key></entry><entry><key>Time Format</key></entry><entry><key>Timestamp Format</key></entry><entry><key>Pretty Print JSON</key></entry><entry><key>suppress-nulls</key></entry></properties><state>ENABLED</state><type>org.apache.nifi.json.JsonRecordSetWriter</type></controllerServices><controllerServices><id>c3e80a29-498b-36d4-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><bundle><artifact>nifi-record-serialization-services-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><comments></comments><descriptors><entry><key>schema-access-strategy</key><value><name>schema-access-strategy</name></value></entry><entry><key>schema-registry</key><value><identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService><name>schema-registry</name></value></entry><entry><key>schema-name</key><value><name>schema-name</name></value></entry><entry><key>schema-version</key><value><name>schema-version</name></value></entry><entry><key>schema-branch</key><value><name>schema-branch</name></value></entry><entry><key>schema-text</key><value><name>schema-text</name></value></entry><entry><key>csv-reader-csv-parser</key><value><name>csv-reader-csv-parser</name></value></entry><entry><key>Date Format</key><value><name>Date Format</name></value></entry><entry><key>Time Format</key><value><name>Time Format</name></value></entry><entry><key>Timestamp Format</key><value><name>Timestamp Format</name></value></entry><entry><key>CSV Format</key><value><name>CSV Format</name></value></entry><entry><key>Value Separator</key><value><name>Value Separator</name></value></entry><entry><key>Skip Header Line</key><value><name>Skip Header Line</name></value></entry><entry><key>ignore-csv-header</key><value><name>ignore-csv-header</name></value></entry><entry><key>Quote Character</key><value><name>Quote Character</name></value></entry><entry><key>Escape Character</key><value><name>Escape Character</name></value></entry><entry><key>Comment Marker</key><value><name>Comment Marker</name></value></entry><entry><key>Null String</key><value><name>Null String</name></value></entry><entry><key>Trim Fields</key><value><name>Trim Fields</name></value></entry><entry><key>csvutils-character-set</key><value><name>csvutils-character-set</name></value></entry></descriptors><name>CSVReader</name><persistsState>false</persistsState><properties><entry><key>schema-access-strategy</key></entry><entry><key>schema-registry</key></entry><entry><key>schema-name</key></entry><entry><key>schema-version</key></entry><entry><key>schema-branch</key></entry><entry><key>schema-text</key></entry><entry><key>csv-reader-csv-parser</key></entry><entry><key>Date Format</key></entry><entry><key>Time Format</key></entry><entry><key>Timestamp Format</key></entry><entry><key>CSV Format</key></entry><entry><key>Value …
我已经创建了具有3个处理器的Nifi工作流程.附件是详细信息.问题是,当我在Nifi中运行工作流程时,它运行正常,但是当我在Kylo中导入相同的模板并通过feed运行它时,它给出了错误.似乎Kylo模板有问题.你能帮帮我吗?同样在Nifi日志中,我无法看到任何错误.
谢谢,
Nifi log:
2017-08-30 10:11:49,764 INFO [pool-8-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@65b65bdb checkpointed with 1 Records and 0 Swap Files in 34 milliseconds (Stop-the-world time = 16 milliseconds, Clear Edit Logs time = 15 millis), max Transaction ID 37082
2017-08-30 10:11:49,764 INFO [pool-8-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 1 records in 34 milliseconds
2017-08-30 10:13:47,109 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@2efe3a02 checkpointed with 2326 Records and 0 Swap Files in 18 milliseconds (Stop-the-world time = 1 …
Run Code Online (Sandbox Code Playgroud) 我已经在Kylo安装目录中为kylo UI上的'data_ingest'设置了一个默认模板 - http:// localhost:8400
使用模板我创建了一个Feed来从数据库(MySQL)中获取数据,在Feed详细信息中指定数据库/表/列.
从下拉列表中选择数据库,我在/opt/kylo/kylo-services/application.properties中给出了所有配置
spring.datasource.username=root
spring.datasource.password=cloudera
..
hive.metastore.datasource.username=root
hive.metastore.datasource.password=cloudera
Run Code Online (Sandbox Code Playgroud)尽管如此,我的工作失败了以下错误 -
GetTableData[id=a3eb6450-1f53-3e1e-a523-01db02f0b625] Unable to execute SQL select from table due to StandardFlowFileRecord[uuid=27219d8a-9bef-40be-a413-63d4d8663ebe,claim=,offset=0,name=32633830193154,size=0]; routing to failure: org.apache.nifi.processor.exception.ProcessException: org.apache.commons.dbcp.SQLNestedException: Cannot create PoolableConnectionFactory (Could not connect: Access denied for user 'root'@'localhost' (using password: NO))
Run Code Online (Sandbox Code Playgroud)
是否有任何我错过的配置导致此错误?