vip*_*rud 2 java apache-kafka apache-kafka-streams confluent-platform ksqldb
我有以下格式的数据进入 Kafka。
{"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}
Run Code Online (Sandbox Code Playgroud)
我希望它像这样转换。
{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}
Run Code Online (Sandbox Code Playgroud)
我尝试使用 ksql 进行扁平化,但 ksql 还不支持数组。
我尝试kstream使用以下代码进行扁平化。
builder.stream(inputTopic).flatMapValues(Object -> Arrays.asList()).to(outputTopic);
Run Code Online (Sandbox Code Playgroud)
但它没有产生任何输出。对此的任何帮助将不胜感激。
KSQL / ksqlDB 确实支持数组。以下是如何执行您所要求的操作:
\n\n-- Declare the stream\nCREATE STREAM TEST1 \n (WHS ARRAY<STRUCT<"action" VARCHAR\n , "Update-Date-Time" VARCHAR\n , "Number" VARCHAR\n , "Abbr" VARCHAR\n , "Name" VARCHAR\n , "Name2" VARCHAR\n , "Country-Code" VARCHAR\n , "Addr-1" VARCHAR\n , "Addr-2" VARCHAR\n , "Addr-4" VARCHAR\n , "City" VARCHAR\n , "State" VARCHAR>>) \n WITH (KAFKA_TOPIC =\'test1\'\n ,VALUE_FORMAT=\'JSON\');\n\n-- Set querying from beginning of the topic\nSET \'auto.offset.reset\' = \'earliest\';\n\n-- Query the array \nksql> SELECT WHS FROM TEST1 EMIT CHANGES LIMIT 1;\n+------------------------------------------------------------------------------------------------------------------------------------------------------+\n|WHS |\n+------------------------------------------------------------------------------------------------------------------------------------------------------+\n|[{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}] |\nLimit Reached\nQuery terminated\nksql> \n\n-- Flatten the array\nksql> SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES LIMIT 1;\n+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n|KSQL_COL_0 |\n+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n|{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=} |\nLimit Reached\nQuery terminated\nksql>\nRun Code Online (Sandbox Code Playgroud)\n\n您可以将其写入另一个流(主题):
\n\nksql> CREATE STREAM TEST1_EXPLODE WITH (KAFKA_TOPIC=\'NEW_TEST1\') AS SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES;\n\n Message\n-------------------------------------------\n Created query with ID CSAS_TEST1_EXPLODE_155\n-------------------------------------------\nksql> PRINT NEW_TEST1;\n\xe2\x80\xa6\nValue format: JSON or KAFKA_STRING\nrowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"KSQL_COL_0":{"ACTION":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","NUMBER":"0","ABBR":"","NAME":"","NAME2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","STATE":""}}\nRun Code Online (Sandbox Code Playgroud)\n\n如果你想展平结果结构,你也可以这样做:
\n\nCREATE STREAM TEST1_FLATTENED AS SELECT EXPLODE(WHS)->"action" AS "action" ,\n EXPLODE(WHS)->"Update-Date-Time" AS "Update-Date-Time" ,\n EXPLODE(WHS)->"Number" AS "Number" ,\n EXPLODE(WHS)->"Abbr" AS "Abbr" ,\n EXPLODE(WHS)->"Name" AS "Name" ,\n EXPLODE(WHS)->"Name2" AS "Name2" ,\n EXPLODE(WHS)->"Country-Code" AS "Country-Code" ,\n EXPLODE(WHS)->"Addr-1" AS "Addr-1" ,\n EXPLODE(WHS)->"Addr-2" AS "Addr-2" ,\n EXPLODE(WHS)->"Addr-4" AS "Addr-4" ,\n EXPLODE(WHS)->"City" AS "City" ,\n EXPLODE(WHS)->"State" AS "State"\n FROM TEST1 EMIT CHANGES;\nRun Code Online (Sandbox Code Playgroud)\n\nksql> PRINT TEST1_FLATTENED;\n\xe2\x80\xa6\nValue format: JSON or KAFKA_STRING\nrowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":"0","Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","State":""}\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
1968 次 |
| 最近记录: |