我有一个工作流程,其中我获取 json 文件作为其余 api 的响应。我在一次会话中收到大约 100k 个文件。所有文件的总大小为 15GB。我必须将每个文件保存到文件系统,这是我正在做的。在该过程结束时,我必须等待所有文件都存在,然后才能发送成功消息。
一旦我将文件保存在 FS 中,我就会调用notify+wait。但我不再需要流程文件中的 15 GB 数据。所以为了释放一些空间,我想到了使用replaceText或ModifyByte来清除内容。所以notify+wait运行顺利。此过程总共等待3小时。
但在这两种情况(replaceText 或ModifyByte)中,过程都花费了太长的时间。
您能否建议清除流文件数据的最快方法。我也不需要任何属性。那么我可以放弃旧的流程文件并生成 kb 流程文件吗?
我想要的是类似于generateflowfile的东西,但是在中间,所以对于我现有的每个流程文件,我可以删除旧的流程文件,并生成空白流程文件以进行通知和等待。
谢谢
我的用例。
Some processing somewhere else add files to some dir (_use_it) -> call my flow using REST -> Now I want my process to read all files from mentioned directory (_use_it).
我想每次都从这个目录中读取所有文件,而不仅仅是更改/添加的文件。我无法启动/停止进程。此流程必须作为后台进程运行。
我想,我正在寻找 ListFile 处理器运行一次,然后停止,然后当它再次运行时,它会忘记以前的状态。“一些扭曲的逻辑”:)
谢谢
我正在从第三方获得CSV文件。该文件的模式是动态的,我唯一可以确定的是,
现在要在我的系统中使用这种类型的数据,我正在考虑将MongoDB用作登台区域。由于没有 从一次加载到另一次加载,列的数量,列的顺序或列的名称都不恒定。我认为MongoDB将成为一个很好的过渡区域。
我读了有关ConvertRecord处理器的信息,该处理器非常适合CSV到JSON转换器,但没有架构。我只希望每行都作为一个文档,标题名称作为键,值作为值。
我应该怎么做?另外,该文件的大小约为25-30 GB,因此我不想降低系统的性能。
我想到了用我自己的处理器(用Java)进行处理,并且能够得到所需的内容,但是这似乎花费了太多时间,而且看起来并不是最佳选择。
让我知道,这是否可以通过现有处理器实现?
谢谢,Rakesh
更新日期:2018/09/05
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template encoding-version="1.2"><description></description><groupId>a2bd0551-0165-1000-7c6a-a32ca4db047c</groupId><name>csv_to_json_no_schema_v1</name><snippet><connections><id>91bc4a66-704c-3a2f-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>bb6c25ae-f2b6-386a-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>eb6cd54a-e1f1-3871-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>ad804e3c-f233-3556-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>64b15a56-8a5f-3297-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>invalid</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>bb6c25ae-f2b6-386a-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>c30bd123-c436-36ce-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>8a0e37da-acd2-3d72-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>valid</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>bb6c25ae-f2b6-386a-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>247d2139-26b7-31fe-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>1297bea9-b30f-3f45-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>failure</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>8a0e37da-acd2-3d72-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>45e5403f-99f7-3ddf-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>9f8f32f7-130c-35bd-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>8a0e37da-acd2-3d72-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><controllerServices><id>88b0195a-34b2-34f0-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><bundle><artifact>nifi-record-serialization-services-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><comments></comments><descriptors><entry><key>Schema Write Strategy</key><value><name>Schema Write Strategy</name></value></entry><entry><key>schema-access-strategy</key><value><name>schema-access-strategy</name></value></entry><entry><key>schema-registry</key><value><identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService><name>schema-registry</name></value></entry><entry><key>schema-name</key><value><name>schema-name</name></value></entry><entry><key>schema-version</key><value><name>schema-version</name></value></entry><entry><key>schema-branch</key><value><name>schema-branch</name></value></entry><entry><key>schema-text</key><value><name>schema-text</name></value></entry><entry><key>Date Format</key><value><name>Date Format</name></value></entry><entry><key>Time Format</key><value><name>Time Format</name></value></entry><entry><key>Timestamp Format</key><value><name>Timestamp Format</name></value></entry><entry><key>Pretty Print JSON</key><value><name>Pretty Print JSON</name></value></entry><entry><key>suppress-nulls</key><value><name>suppress-nulls</name></value></entry></descriptors><name>JsonRecordSetWriter</name><persistsState>false</persistsState><properties><entry><key>Schema Write Strategy</key><value>no-schema</value></entry><entry><key>schema-access-strategy</key></entry><entry><key>schema-registry</key></entry><entry><key>schema-name</key></entry><entry><key>schema-version</key></entry><entry><key>schema-branch</key></entry><entry><key>schema-text</key></entry><entry><key>Date Format</key></entry><entry><key>Time Format</key></entry><entry><key>Timestamp Format</key></entry><entry><key>Pretty Print JSON</key></entry><entry><key>suppress-nulls</key></entry></properties><state>ENABLED</state><type>org.apache.nifi.json.JsonRecordSetWriter</type></controllerServices><controllerServices><id>c3e80a29-498b-36d4-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><bundle><artifact>nifi-record-serialization-services-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><comments></comments><descriptors><entry><key>schema-access-strategy</key><value><name>schema-access-strategy</name></value></entry><entry><key>schema-registry</key><value><identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService><name>schema-registry</name></value></entry><entry><key>schema-name</key><value><name>schema-name</name></value></entry><entry><key>schema-version</key><value><name>schema-version</name></value></entry><entry><key>schema-branch</key><value><name>schema-branch</name></value></entry><entry><key>schema-text</key><value><name>schema-text</name></value></entry><entry><key>csv-reader-csv-parser</key><value><name>csv-reader-csv-parser</name></value></entry><entry><key>Date Format</key><value><name>Date Format</name></value></entry><entry><key>Time Format</key><value><name>Time Format</name></value></entry><entry><key>Timestamp Format</key><value><name>Timestamp Format</name></value></entry><entry><key>CSV Format</key><value><name>CSV Format</name></value></entry><entry><key>Value Separator</key><value><name>Value Separator</name></value></entry><entry><key>Skip Header Line</key><value><name>Skip Header Line</name></value></entry><entry><key>ignore-csv-header</key><value><name>ignore-csv-header</name></value></entry><entry><key>Quote Character</key><value><name>Quote Character</name></value></entry><entry><key>Escape Character</key><value><name>Escape Character</name></value></entry><entry><key>Comment Marker</key><value><name>Comment Marker</name></value></entry><entry><key>Null String</key><value><name>Null String</name></value></entry><entry><key>Trim Fields</key><value><name>Trim Fields</name></value></entry><entry><key>csvutils-character-set</key><value><name>csvutils-character-set</name></value></entry></descriptors><name>CSVReader</name><persistsState>false</persistsState><properties><entry><key>schema-access-strategy</key></entry><entry><key>schema-registry</key></entry><entry><key>schema-name</key></entry><entry><key>schema-version</key></entry><entry><key>schema-branch</key></entry><entry><key>schema-text</key></entry><entry><key>csv-reader-csv-parser</key></entry><entry><key>Date Format</key></entry><entry><key>Time Format</key></entry><entry><key>Timestamp Format</key></entry><entry><key>CSV Format</key></entry><entry><key>Value …
我有以下 JSON,我想将其转换为 CSV。
JSON 的元素将是常量,如果值不存在,那么它将为空。但是属性仍然可用。
我想将其转换为 9 列的 CSV。
我有这样的流程 =>
InvokeHTTP->EvaluateJSONPath->InferAvroSchema-> UpdateAttribute->ConvertRecord
Run Code Online (Sandbox Code Playgroud)
最后一个处理器出现故障。
a,b,c,d,e,sub-a,sub-b,sub-c,sub-d
[
{
"a": "a-value",
"b": "b-value",
"c": "c-value",
"d": "d-value",
"e": 123,
"sub_value": {
"sub-a": "sub-a-value",
"sub-b": null,
"sub-c": "sub-c-value",
"sub-d": "sub-d-value"
}
},
{
"a": "a2-value",
"b": "b2-value",
"c": "c2-value",
"d": "d2-value",
"e": 123,
"sub_value": {
"sub-a": "sub-2a-value",
"sub-b": "sub-2a-value",
"sub-c": "sub-2c-value",
"sub-d": "sub-2d-value"
}
}
]
Run Code Online (Sandbox Code Playgroud)
如果我不想在 JsonReader 中使用属性怎么办?(我可能有太多的值,它可能会成为维护的噩梦)。是其他方式吗?
我仍然可以通过在阅读和写作时使用 avroschema 来解决我的用例吗?
输入-json =>
{"store_number":"33152","store_name":"33152 WALMART JARDINES DEL COUNTRY 2374","store_display_name":"WALMART JARDINES …Run Code Online (Sandbox Code Playgroud)