Swa*_*aru 3 persistence apache-kafka confluent-platform ksqldb
我们在 ubuntu 上使用融合平台。我们将简单的 JSON 数据通过 cURL 请求发送到名为“UE_Context”的 kafka 主题上的 kafka-rest 服务器。
使用以下命令为此主题创建名为“UE_CONTEXT_STREAM”的 kafka 流:
CREATE STREAM UE_Context_Stream (ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', VALUE_FORMAT='JSON');
Run Code Online (Sandbox Code Playgroud)
使用以下命令为此主题创建名为“UE_CONTEXT_TABLE”的 kafka 表:
CREATE TABLE UE_Context_Table ( registertime BIGINT, ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', KEY='ue_key', VALUE_FORMAT='JSON');
Run Code Online (Sandbox Code Playgroud)
我使用以下 cURL 命令在主题上抽取了两行数据:
curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x1234", "ecgi" : "1234"}}]}' "http://localhost:8082/topics/UE_Context"
curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x4321", "ecgi" : "4321"}}]}' "http://localhost:8082/topics/UE_Context"
Run Code Online (Sandbox Code Playgroud)
我有一个选择查询在表上等待,如下所示:
当 JSON 数据被泵入主题时,此查询会显示表信息。然后我们停止将 JSON 数据泵入主题并结束选择查询并结束选择查询。如果稍后执行选择,则不会显示先前填充的表信息。有没有办法保留这些数据?Kafka 连接器和使用数据库可能是一种选择。但是 kSQL 没有临时内存来存储表信息吗?
在稍后的时间点执行选择,不显示先前填充的表信息。
select 语句默认为主题的最新偏移量
如果要查看之前的数据,需要将消费者偏移量设置回开头
SET 'auto.offset.reset'='earliest';
Run Code Online (Sandbox Code Playgroud)
此外,如文档中所述(重点)
SELECT 语句本身是一个非持久性连续查询。SELECT 语句的结果不会保存在 Kafka 主题中,只会打印在 KSQL 控制台中。不要将 CREATE STREAM AS SELECT 创建的持久查询与来自 SELECT 语句的流式查询结果混淆。
| 归档时间: |
|
| 查看次数: |
1069 次 |
| 最近记录: |