ClickHouse JSON 解析异常:无法解析输入:之前需要 ','

Gec*_*cKo 3 apache-kafka clickhouse

我正在尝试将 JSON 数据从 Kafka 添加到 ClickHouse。这是简化的 JSON:

{
  ...
   "sendAddress":{
      "sendCommChannelTypeId":4,
      "sendCommChannelTypeCode":"SMS",
      "sendAddress":"789345345945"},
   ...
}
Run Code Online (Sandbox Code Playgroud)

以下是在 ClickHouse 中创建表的步骤,使用 Kafka Engine 创建另一个表并创建 MATERIALIZED VIEW 来连接这两个表,并将 CH 与 Kafka 连接。

创建第一个表

CREATE TABLE tab 
(
    ...

    sendAddress Tuple (sendCommChannelTypeId Int32, sendCommChannelTypeCode String, sendAddress String),
     ...

)Engine = MergeTree()
PARTITION BY applicationId
ORDER BY (applicationId);
Run Code Online (Sandbox Code Playgroud)

使用 Kafka 引擎设置创建第二个表:

CREATE TABLE tab_kfk
(
    ...
    sendAddress Tuple (sendCommChannelTypeId Int32, sendCommChannelTypeCode String, sendAddress String),
    ...
)ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
       kafka_topic_list = 'topk2',
       kafka_group_name = 'group1',
       kafka_format = 'JSONEachRow',
       kafka_row_delimiter = '\n';
Run Code Online (Sandbox Code Playgroud)

创建物化视图

CREATE MATERIALIZED VIEW tab_mv TO tab AS
SELECT ... sendAddress, ...
FROM tab_kfk;
Run Code Online (Sandbox Code Playgroud)

然后我尝试从第一个表 - 选项卡中选择所有或特定项目,但什么也没得到。日志如下例外

好的。只需在 sendAddress 中的大括号前添加“[]”,如下所示:

"authkey":"some_value",
   "sendAddress":[{
      "sendCommChannelTypeId":4,
      "sendCommChannelTypeCode":"SMS",
      "sendAddress":"789345345945"
   }]
Run Code Online (Sandbox Code Playgroud)

我仍然得到一个错误,但略有不同: 我应该怎么做才能解决这个问题,谢谢!

vla*_*mir 7

有 3 种方法可以修复它:

  1. 在插入 Kafka 主题之前不使用嵌套对象并扁平化消息。例如这样的方式:
{
    ..
    "authkey":"key",
    "sendAddress_CommChannelTypeId":4,
    "sendAddress_CommChannelTypeCode":"SMS",
    "sendAddress":"789345345945",
    ..
}
Run Code Online (Sandbox Code Playgroud)
  1. 使用需要更改 JSON 消息架构和表架构的嵌套数据结构:
{
    ..
    "authkey":"key",
    "sendAddress.sendCommChannelTypeId":[4],
    "sendAddress.sendCommChannelTypeCode":["SMS"],
    "sendAddress.sendAddress":["789345345945"],
    ..
}
Run Code Online (Sandbox Code Playgroud)
CREATE TABLE tab_kfk
(
    applicationId Int32,
    ..
    sendAddress Nested(
        sendCommChannelTypeId Int32,
        sendCommChannelTypeCode String,
        sendAddress String),
    ..
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
       kafka_topic_list = 'topk2',
       kafka_group_name = 'group1',
       kafka_format = 'JSONEachRow',
       kafka_row_delimiter = '\n',
       input_format_import_nested_json = 1 /* <--- */
Run Code Online (Sandbox Code Playgroud)

考虑设置input_format_import_nested_json

  1. 将输入 JSON 消息解释为字符串并手动解析它(请参阅github 问题 #16969):
CREATE TABLE tab_kfk
(
    message String
)
ENGINE = Kafka
SETTINGS 
    ..
    kafka_format = 'JSONAsString', /* <--- */
    ..

CREATE MATERIALIZED VIEW tab_mv TO tab 
AS
SELECT 
    ..
    JSONExtractString(message, 'authkey') AS authkey,
    JSONExtract(message, 'sendAddress', 'Tuple(Int32,String,String)') AS sendAddress,
    ..
FROM tab_kfk;
Run Code Online (Sandbox Code Playgroud)