使用 KSTREAM 或 KSQL 将 JSON 数组转换为 JSON 对象

vip*_*rud 2 java apache-kafka apache-kafka-streams confluent-platform ksqldb

我有以下格式的数据进入 Kafka。

{"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}

Run Code Online (Sandbox Code Playgroud)

我希望它像这样转换。

{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}
Run Code Online (Sandbox Code Playgroud)

我尝试使用 ksql 进行扁平化,但 ksql 还不支持数组。

我尝试kstream使用以下代码进行扁平化。

builder.stream(inputTopic).flatMapValues(Object -> Arrays.asList()).to(outputTopic);
Run Code Online (Sandbox Code Playgroud)

但它没有产生任何输出。对此的任何帮助将不胜感激。

Rob*_*att 5

KSQL / ksqlDB 确实支持数组。以下是如何执行您所要求的操作:

\n\n
-- Declare the stream\nCREATE STREAM TEST1 \n    (WHS ARRAY<STRUCT<"action"           VARCHAR\n                    , "Update-Date-Time" VARCHAR\n                    , "Number"           VARCHAR\n                    , "Abbr"             VARCHAR\n                    , "Name"             VARCHAR\n                    , "Name2"            VARCHAR\n                    , "Country-Code"     VARCHAR\n                    , "Addr-1"           VARCHAR\n                    , "Addr-2"           VARCHAR\n                    , "Addr-4"           VARCHAR\n                    , "City"             VARCHAR\n                    , "State"            VARCHAR>>) \n    WITH (KAFKA_TOPIC =\'test1\'\n         ,VALUE_FORMAT=\'JSON\');\n\n-- Set querying from beginning of the topic\nSET \'auto.offset.reset\' = \'earliest\';\n\n-- Query the array         \nksql> SELECT WHS FROM TEST1 EMIT CHANGES LIMIT 1;\n+------------------------------------------------------------------------------------------------------------------------------------------------------+\n|WHS                                                                                                                                                   |\n+------------------------------------------------------------------------------------------------------------------------------------------------------+\n|[{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}] |\nLimit Reached\nQuery terminated\nksql>         \n\n-- Flatten the array\nksql> SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES LIMIT 1;\n+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n|KSQL_COL_0                                                                                                                                                                                                                                                                               |\n+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n|{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}                                                                                                                                      |\nLimit Reached\nQuery terminated\nksql>\n
Run Code Online (Sandbox Code Playgroud)\n\n

您可以将其写入另一个流(主题):

\n\n
ksql> CREATE STREAM TEST1_EXPLODE WITH (KAFKA_TOPIC=\'NEW_TEST1\') AS SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES;\n\n Message\n-------------------------------------------\n Created query with ID CSAS_TEST1_EXPLODE_155\n-------------------------------------------\nksql> PRINT NEW_TEST1;\n\xe2\x80\xa6\nValue format: JSON or KAFKA_STRING\nrowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"KSQL_COL_0":{"ACTION":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","NUMBER":"0","ABBR":"","NAME":"","NAME2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","STATE":""}}\n
Run Code Online (Sandbox Code Playgroud)\n\n

如果你想展平结果结构,你也可以这样做:

\n\n
CREATE STREAM TEST1_FLATTENED AS SELECT  EXPLODE(WHS)->"action"           AS "action"           ,\n        EXPLODE(WHS)->"Update-Date-Time" AS "Update-Date-Time" ,\n        EXPLODE(WHS)->"Number"           AS "Number"           ,\n        EXPLODE(WHS)->"Abbr"             AS "Abbr"             ,\n        EXPLODE(WHS)->"Name"             AS "Name"             ,\n        EXPLODE(WHS)->"Name2"            AS "Name2"            ,\n        EXPLODE(WHS)->"Country-Code"     AS "Country-Code"     ,\n        EXPLODE(WHS)->"Addr-1"           AS "Addr-1"           ,\n        EXPLODE(WHS)->"Addr-2"           AS "Addr-2"           ,\n        EXPLODE(WHS)->"Addr-4"           AS "Addr-4"           ,\n        EXPLODE(WHS)->"City"             AS "City"             ,\n        EXPLODE(WHS)->"State"            AS "State"\n    FROM TEST1 EMIT CHANGES;\n
Run Code Online (Sandbox Code Playgroud)\n\n
ksql> PRINT TEST1_FLATTENED;\n\xe2\x80\xa6\nValue format: JSON or KAFKA_STRING\nrowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":"0","Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","State":""}\n
Run Code Online (Sandbox Code Playgroud)\n