如何从 ksqldb 表中删除值或插入逻辑删除值?

Kub*_*bus 3 ksqldb

如何通过 Rest api 或至少作为 ksqldb-cli 中的语句来标记 ksql 表中的行以进行删除?

CREATE TABLE movies (
      title VARCHAR PRIMARY KEY,
      id INT,
      release_year INT
    ) WITH (
      KAFKA_TOPIC='movies',
      PARTITIONS=1,
      VALUE_FORMAT = 'JSON'
    );

INSERT INTO MOVIES (ID, TITLE, RELEASE_YEAR) VALUES (48, 'Aliens', 1986);
Run Code Online (Sandbox Code Playgroud)

由于明显的原因,这不起作用,但是 ksqldb 中不存在 DELETE 语句:

INSERT INTO MOVIES (ID, TITLE, RELEASE_YEAR) VALUES (48, null, null);
Run Code Online (Sandbox Code Playgroud)

有没有办法创建推荐的逻辑删除空值,或者我是否需要将其直接写入基础主题?

Rob*_*att 10

有一种方法可以做到这一点,这只是一种解决方法。技巧是使用KAFKA值格式将逻辑删除写入基础主题。

这是一个使用原始 DDL 的示例。

-- Insert a second row of data
INSERT INTO MOVIES (ID, TITLE, RELEASE_YEAR) VALUES (42, 'Life of Brian', 1986);

-- Query table
ksql> SET 'auto.offset.reset' = 'earliest';

ksql> select * from movies emit changes limit 2;
+--------------------------------+--------------------------------+--------------------------------+
|TITLE                           |ID                              |RELEASE_YEAR                    |
+--------------------------------+--------------------------------+--------------------------------+
|Life of Brian                   |42                              |1986                            |
|Aliens                          |48                              |1986                            |
Limit Reached
Query terminated
Run Code Online (Sandbox Code Playgroud)

现在声明一个新流,它将使用相同的密钥写入相同的 Kafka 主题:

CREATE STREAM MOVIES_DELETED (title VARCHAR KEY, DUMMY VARCHAR) 
  WITH (KAFKA_TOPIC='movies', 
       VALUE_FORMAT='KAFKA');
Run Code Online (Sandbox Code Playgroud)

插入墓碑消息:

INSERT INTO MOVIES_DELETED (TITLE,DUMMY) VALUES ('Aliens',CAST(NULL AS VARCHAR));
Run Code Online (Sandbox Code Playgroud)

再次查询表:

ksql> select * from movies emit changes limit 2;
+--------------------------------+--------------------------------+--------------------------------+
|TITLE                           |ID                              |RELEASE_YEAR                    |
+--------------------------------+--------------------------------+--------------------------------+
|Life of Brian                   |42                              |1986                            |
Run Code Online (Sandbox Code Playgroud)

检查基本主题

ksql> print movies;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/02/22 11:01:05.966 Z, key: Aliens, value: {"ID":48,"RELEASE_YEAR":1986}, partition: 0
rowtime: 2021/02/22 11:02:00.194 Z, key: Life of Brian, value: {"ID":42,"RELEASE_YEAR":1986}, partition: 0
rowtime: 2021/02/22 11:04:52.569 Z, key: Aliens, value: <null>, partition: 0
Run Code Online (Sandbox Code Playgroud)