在 KSQL 中加入两个(或更多)kafka 主题以发出所有主题的更改的最佳方式?

Phi*_*her 6 apache-kafka debezium ksqldb

我们有一个“微服务”平台,我们正在使用 Debezium 从这些平台上的数据库捕获变更数据,效果很好。

现在,我们希望能够轻松地加入这些主题并将结果流式传输到一个可由多个服务使用的新主题。

免责声明:这假设 ksqldb 和 cli v0.11(似乎其中大部分内容在旧版本中可能不起作用)

来自两个数据库实例的两个表流入 Kafka 主题的示例:

-- source identity microservice (postgres)
CREATE TABLE public.user_entity (
    id varchar(36) NOT NULL,
    first_name varchar(255) NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');

-- source organization microservice (postgres)
CREATE TABLE public.user_info (
    id varchar(36) NOT NULL,
    user_entity_id varchar(36) NOT NULL,
    business_unit varchar(255) NOT NULL,
    cost_center varchar(255) NOT NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');
Run Code Online (Sandbox Code Playgroud)

选项 1:流

CREATE STREAM stream_user_info_by_user_entity_id
AS SELECT * FROM stream_user_info
PARTITION BY user_entity_id
EMIT CHANGES;

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN stream_user_info_by_user_entity_id ui WITHIN 365 DAYS ON ue.id = ui.user_entity_id 
EMIT CHANGES;
Run Code Online (Sandbox Code Playgroud)

请注意WITHIN 365 DAYS,从概念上讲,这些表可能会持续很长时间而不发生更改,因此该窗口在技术上将无限大。这看起来很可疑,似乎暗示这不是一个好方法。

选项 2:表格

CREATE TABLE ktable_user_info_by_user_entity_id (
    user_entity_id,
    first_name,
    business_unit,
    cost_center
)
with (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON ue.id = ui.user_entity_id 
EMIT CHANGES;
Run Code Online (Sandbox Code Playgroud)

我们不再需要 window WITHIN 365 DAYS,所以这感觉更正确。 但是,只有当消息发送到流而不是表时,才会发出更改。

在此示例中: 用户更新first_name -> 发出更改 用户更新business_unit -> 未发出更改

也许有一种方法可以创建一个由 user_entity_id 分区的合并流并连接到保存当前状态的子表,这导致我......

选项 3:合并流和表

-- "master" change stream with merged stream output
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

CREATE STREAM stream_user_entity_by_id
AS SELECT * FROM stream_user_entity
PARTITION BY id
EMIT CHANGES;

CREATE TABLE ktable_user_entity_by_id (
    id VARCHAR PRIMARY KEY,
    first_name VARCHAR
) with (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');

SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;
Run Code Online (Sandbox Code Playgroud)

这个看起来最好,但似乎有很多移动组件,每个表都有 2 个流、1 个插入查询、1 个 ktable。这里的另一个潜在问题可能是隐藏的竞争条件,其中流在表在幕后更新之前发出更改。

选项 4:更多合并的表和流

CREATE STREAM stream_user_entity_changes_enriched
AS SELECT 
    ue.id AS user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_by_id ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

CREATE STREAM stream_user_info_changes_enriched
AS SELECT 
    ui.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_info_by_user_entity_id ui
LEFT JOIN ktable_user_entity_by_id ue ON ui.user_entity_id = ue.id
EMIT CHANGES;


CREATE STREAM stream_user_changes_enriched (user_entity_id VARCHAR, first_name VARCHAR, business_unit VARCHAR, cost_center VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes_enriched', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_entity_changes_enriched;
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_info_changes_enriched;
Run Code Online (Sandbox Code Playgroud)

这在概念上与前一个相同,但“合并”发生在连接之后。可以想象,这可能会消除任何潜在的竞争条件,因为我们主要从流而不是表中进行选择。

缺点是复杂性比选项 3 还要糟糕,并且为任何具有两个以上表的连接编写和跟踪所有这些流会让人感到麻木......

问题:哪种方法最适合此用例和/或我们是否尝试做一些不应该使用 ksql 的事情?我们是否最好将其转移到传统 RDBMS 或 Spark 替代方案?

Phi*_*her 4

我将尝试回答我自己的问题,只有在赞成的情况下才接受。

答案是:选项3

以下是此用例的原因,这将是最好的,虽然可能可能是主观的

  • 由主键和外键分区的流是常见且简单的。
  • 基于这些流的表是常见且简单的。
  • 以这种方式使用的表不会成为竞争条件。

所有选项都有优点,例如,如果您不关心发出所有更改,或者数据的行为类似于流(日志或事件)而不是缓慢变化的维度(sql 表)。

至于“竞争条件”,“表”这个词会欺骗您的想法,您实际上正在处理和保存数据。事实上,它们实际上并不是物理表,它们的行为实际上更像是流上的子查询。注意:对于实际产生主题的聚合表来说可能是一个例外(我建议这是一个不同的主题,但希望看到评论)

最后(语法可能有一些小错误):

---------------------------------------------------------
-- shared objects (likely to be used by multiple queries)
---------------------------------------------------------

-- shared streams wrapping topics
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');

-- shared keyed streams (i like to think of them as "indexes")
CREATE STREAM stream_user_entity_by_id AS 
SELECT * FROM stream_user_entity PARTITION BY id
EMIT CHANGES;
CREATE STREAM stream_user_info_by_user_entity_id AS 
SELECT * FROM stream_user_info PARTITION BY user_entity_id
EMIT CHANGES;

-- shared keyed tables (inferring columns with schema registry)
CREATE TABLE ktable_user_entity_by_id (id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');
CREATE TABLE ktable_user_info_by_user_entity_id (user_entity_id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');


---------------------------------------------------------
-- query objects (specific to the produced data)
---------------------------------------------------------
-- "master" change stream (include all tables in join)
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

-- pretty simple looking query
SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;
Run Code Online (Sandbox Code Playgroud)

“共享”对象基本上是流模式(诱惑是为我们所有的主题创建,但这是另一个问题),第二部分就像查询模式。它最终是一个实用、干净且可重复的模式。