使用 ClickHouse 使用来自 Kafka 的嵌套 JSON 消息

Ram*_*lat 8 json apache-kafka clickhouse

如果 JSON 消息是平面 JSON 文档,Clickhouse 绝对可以从 Kafka 读取 JSON 消息。

我们kafka_format = 'JSONEachRow'在 Clickhouse 中用它来表示这一点。

这是我们目前使用的方式:

CREATE TABLE topic1_kafka
(
    ts Int64,
    event String,
    title String,
    msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1test.intra:9092,kafka2test.intra:9092,kafka3test.intra:9092',
kafka_topic_list = 'topic1', kafka_num_consumers = 1, kafka_group_name = 'ch1', 
kafka_format = 'JSONEachRow'
Run Code Online (Sandbox Code Playgroud)

只要生产者将平面 JSON 发送到topic1_kafka. 但并非所有生产者都会发送平面 JSON,大多数应用程序都会生成如下所示的嵌套 JSON 文档:

{
  "ts": 1598033988,
  "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663",
  "location": [39.920515, 32.853708],
  "stats": {
    "temp": 71.2,
    "total_memory": 32,
    "used_memory": 21.2
  }
}
Run Code Online (Sandbox Code Playgroud)

不幸的是,上面的 JSON 文档与 不兼容JSONEachRow,因此 ClickHouse 无法将 JSON 文档中的字段映射到表中的列。

有什么办法可以完成这个映射吗?

编辑:我们想要将嵌套的 json 映射到一个平面表,如下所示:

{
  "ts": 1598033988,
  "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663",
  "location": [39.920515, 32.853708],
  "stats": {
    "temp": 71.2,
    "total_memory": 32,
    "used_memory": 21.2
  }
}
Run Code Online (Sandbox Code Playgroud)

vla*_*mir 6

看起来一次性的方法是将“原始”数据作为字符串获取,然后在消费者物化视图中使用JSON 函数处理每一行。

\n
WITH \'{"ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": 71.2, "total_memory": 32, "used_memory": 21.2 }}\' AS raw\nSELECT \n  JSONExtractUInt(raw, \'ts\') AS ts,\n  JSONExtractString(raw, \'deviceId\') AS deviceId,\n  arrayMap(x -> toFloat32(x), JSONExtractArrayRaw(raw, \'location\')) AS location,\n  JSONExtract(raw, \'stats\', \'Tuple(temp Float64, total_memory Float64, used_memory Float64)\') AS stats,\n  stats.1 AS temp,\n  stats.2 AS total_memory,\n  stats.3 AS used_memory;\n\n/*\n\xe2\x94\x8c\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80ts\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80deviceId\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80location\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80stats\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80temp\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80total_memory\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80used_memory\xe2\x94\x80\xe2\x94\x90\n\xe2\x94\x82 1598033988 \xe2\x94\x82 cf060111-dbe6-4aa8-a2d0-d5aa17f45663 \xe2\x94\x82 [39.920513,32.853706] \xe2\x94\x82 (71.2,32,21.200000000000003) \xe2\x94\x82 71.2 \xe2\x94\x82           32 \xe2\x94\x82 21.200000000000003 \xe2\x94\x82\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x98\n*/\n
Run Code Online (Sandbox Code Playgroud)\n

备注:对于浮点数字,应使用Float64类型而不是Float32(请参阅相关CH Issue 13962)。

\n
\n

使用标准数据类型需要更改 JSON 的架构:

\n
    \n
  1. 统计数据表示为元组
  2. \n
\n
CREATE TABLE test_tuple_field\n(\n    ts Int64,\n    deviceId String,\n    location Array(Float32),\n    stats Tuple(Float32, Float32, Float32)\n) ENGINE = MergeTree()\nORDER BY ts;\n\n\nINSERT INTO test_tuple_field FORMAT JSONEachRow \n{ "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": [71.2, 32, 21.2]};\n
Run Code Online (Sandbox Code Playgroud)\n
    \n
  1. 将统计数据表示为嵌套结构
  2. \n
\n
CREATE TABLE test_nested_field\n(\n    ts Int64,\n    deviceId String,\n    location Array(Float32),\n    stats Nested (temp Float32, total_memory Float32, used_memory Float32)\n) ENGINE = MergeTree()\nORDER BY ts;\n\n\nSET input_format_import_nested_json=1;\nINSERT INTO test_nested_field FORMAT JSONEachRow \n{ "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": [71.2], "total_memory": [32], "used_memory": [21.2] }};\n
Run Code Online (Sandbox Code Playgroud)\n
\n

请参阅相关答案ClickHouse JSON parse error: Cannot parse input: Expected \',\' before

\n


Ale*_*sky 2

我遇到了同样的问题,我能够使用JSONAsString格式解决它。在此格式中,单个 JSON 对象被解释为单个值。此格式只能解析具有单个 String 类型字段的表。

CREATE TABLE topic1_kafka (
  data String
) 
ENGINE = Kafka
SETTINGS 
    kafka_format = 'JSONAsString',
    ...


CREATE MATERIALIZED VIEW topic1_mv TO topic1 AS
SELECT 
    JSONExtractInt(data, 'ts') AS ts, 
    JSONExtractString(data, 'deviceId') AS deviceId, 
    JSONExtractFloat(data, 'location', 1) AS location_1,  -- arrays are indexed from 1
    JSONExtractFloat(data, 'location', 2) AS location_2,
    JSONExtractFloat(data, 'stats', 'temp') AS stats_temp, 
    JSONExtractFloat(data, 'stats', 'total_memory') AS stats_total_memory, 
    JSONExtractFloat(data, 'stats', 'memory') AS stats_used_memory 
FROM topic1_kafka
Run Code Online (Sandbox Code Playgroud)

也可以看看: