Rag*_*ddy 3 apache-kafka confluent-platform ksqldb
我的 JSON 看起来像:
{
"Obj1": {
"a": "abc",
"b": "def",
"c": "ghi"
},
"ArrayObj": [
{
"key1": "1",
"Key2": "2",
"Key3": "3",
},
{
"key1": "4",
"Key2": "5",
"Key3": "6",
},
{
"key1": "7",
"Key2": "8",
"Key3": "9",
}
]
}
Run Code Online (Sandbox Code Playgroud)
我已经编写了 KSQL 流将其转换为 AVRO 并保存到一个主题,以便我可以将其推送到 JDBC Sink 连接器
CREATE STREAM Example1(ArrayObj ARRAY<STRUCT<key1 VARCHAR, Key2 VARCHAR>>,Obj1 STRUCT<a VARCHAR>)WITH(kafka_topic='sample_topic', value_format='JSON');
CREATE STREAM Example_Avro WITH(VALUE_FORMAT='avro') AS SELECT e.ArrayObj[0] FROM Example1 e;
Run Code Online (Sandbox Code Playgroud)
在 Example_Avro 中,我只能获取数组中的第一个对象。
当我在 KSQL 中点击 select * from Example_Avro 时,如何获得如下所示的数据?
a b key1 key2 key3
abc def 1 2 3
abc def 4 5 6
abc def 7 8 9
Run Code Online (Sandbox Code Playgroud)
测试数据(我删除了key3值后无效的尾随逗号):
ksql> PRINT test4;
Format:JSON
1/9/20 7:45:18 PM UTC , NULL , { "Obj1": { "a": "abc", "b": "def", "c": "ghi" }, "ArrayObj": [ { "key1": "1", "Key2": "2", "Key3": "3" }, { "key1": "4", "Key2": "5", "Key3": "6" }, { "key1": "7", "Key2": "8", "Key3": "9" } ] }
Run Code Online (Sandbox Code Playgroud)
询问:
ksql> PRINT test4;
Format:JSON
1/9/20 7:45:18 PM UTC , NULL , { "Obj1": { "a": "abc", "b": "def", "c": "ghi" }, "ArrayObj": [ { "key1": "1", "Key2": "2", "Key3": "3" }, { "key1": "4", "Key2": "5", "Key3": "6" }, { "key1": "7", "Key2": "8", "Key3": "9" } ] }
Run Code Online (Sandbox Code Playgroud)
结果:
+-------+-------+------+-------+-------+
|A |B |KEY1 |KEY2 |KEY3 |
+-------+-------+------+-------+-------+
|abc |def |1 |2 |3 |
|abc |def |4 |5 |6 |
|abc |def |7 |8 |9 |
Run Code Online (Sandbox Code Playgroud)
在 ksqlDB 0.6 上测试,其中EXPLODE添加了该功能。
| 归档时间: |
|
| 查看次数: |
1486 次 |
| 最近记录: |