Gec*_*cKo 3 apache-kafka clickhouse
我正在尝试将 JSON 数据从 Kafka 添加到 ClickHouse。这是简化的 JSON:
{
...
"sendAddress":{
"sendCommChannelTypeId":4,
"sendCommChannelTypeCode":"SMS",
"sendAddress":"789345345945"},
...
}
Run Code Online (Sandbox Code Playgroud)
以下是在 ClickHouse 中创建表的步骤,使用 Kafka Engine 创建另一个表并创建 MATERIALIZED VIEW 来连接这两个表,并将 CH 与 Kafka 连接。
创建第一个表
CREATE TABLE tab
(
...
sendAddress Tuple (sendCommChannelTypeId Int32, sendCommChannelTypeCode String, sendAddress String),
...
)Engine = MergeTree()
PARTITION BY applicationId
ORDER BY (applicationId);
Run Code Online (Sandbox Code Playgroud)
使用 Kafka 引擎设置创建第二个表:
CREATE TABLE tab_kfk
(
...
sendAddress Tuple (sendCommChannelTypeId Int32, sendCommChannelTypeCode String, sendAddress String),
...
)ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topk2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n';
Run Code Online (Sandbox Code Playgroud)
创建物化视图
CREATE MATERIALIZED VIEW tab_mv TO tab AS
SELECT ... sendAddress, ...
FROM tab_kfk;
Run Code Online (Sandbox Code Playgroud)
然后我尝试从第一个表 - 选项卡中选择所有或特定项目,但什么也没得到。日志如下
好的。只需在 sendAddress 中的大括号前添加“[]”,如下所示:
"authkey":"some_value",
"sendAddress":[{
"sendCommChannelTypeId":4,
"sendCommChannelTypeCode":"SMS",
"sendAddress":"789345345945"
}]
Run Code Online (Sandbox Code Playgroud)
有 3 种方法可以修复它:
{
..
"authkey":"key",
"sendAddress_CommChannelTypeId":4,
"sendAddress_CommChannelTypeCode":"SMS",
"sendAddress":"789345345945",
..
}
Run Code Online (Sandbox Code Playgroud)
{
..
"authkey":"key",
"sendAddress.sendCommChannelTypeId":[4],
"sendAddress.sendCommChannelTypeCode":["SMS"],
"sendAddress.sendAddress":["789345345945"],
..
}
Run Code Online (Sandbox Code Playgroud)
CREATE TABLE tab_kfk
(
applicationId Int32,
..
sendAddress Nested(
sendCommChannelTypeId Int32,
sendCommChannelTypeCode String,
sendAddress String),
..
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topk2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
input_format_import_nested_json = 1 /* <--- */
Run Code Online (Sandbox Code Playgroud)
考虑设置input_format_import_nested_json。
CREATE TABLE tab_kfk
(
message String
)
ENGINE = Kafka
SETTINGS
..
kafka_format = 'JSONAsString', /* <--- */
..
CREATE MATERIALIZED VIEW tab_mv TO tab
AS
SELECT
..
JSONExtractString(message, 'authkey') AS authkey,
JSONExtract(message, 'sendAddress', 'Tuple(Int32,String,String)') AS sendAddress,
..
FROM tab_kfk;
Run Code Online (Sandbox Code Playgroud)