我知道,在 KSQL 中我们可以将偏移量设置为最早或最晚但是我们可以获取特定时间段的数据,即我需要从 2020 年 5 月 6 日起将数据插入到主题中?
我的 JSON 看起来像:
{
"Obj1": {
"a": "abc",
"b": "def",
"c": "ghi"
},
"ArrayObj": [
{
"key1": "1",
"Key2": "2",
"Key3": "3",
},
{
"key1": "4",
"Key2": "5",
"Key3": "6",
},
{
"key1": "7",
"Key2": "8",
"Key3": "9",
}
]
}
Run Code Online (Sandbox Code Playgroud)
我已经编写了 KSQL 流将其转换为 AVRO 并保存到一个主题,以便我可以将其推送到 JDBC Sink 连接器
CREATE STREAM Example1(ArrayObj ARRAY<STRUCT<key1 VARCHAR, Key2 VARCHAR>>,Obj1 STRUCT<a VARCHAR>)WITH(kafka_topic='sample_topic', value_format='JSON');
CREATE STREAM Example_Avro WITH(VALUE_FORMAT='avro') AS SELECT e.ArrayObj[0] FROM Example1 e;
Run Code Online (Sandbox Code Playgroud)
在 Example_Avro 中,我只能获取数组中的第一个对象。
当我在 KSQL 中点击 select * from Example_Avro 时,如何获得如下所示的数据? …