dot*_*Leo 2 struct join apache-kafka debezium ksqldb
我需要从其他九个主题的组合中创建一个 Kafka 主题,所有主题均由 Debezium PostgreSQL 源连接器以 AVRO 格式生成。首先,我尝试(到目前为止未成功)组合来自两个主题的字段。
因此,首先根据“REQUEST”主题创建一个 ksqlDB 表:
ksql> CREATE TABLE TB_REQUEST (ID STRUCT<REQUEST_ID BIGINT> PRIMARY KEY)
WITH (KAFKA_TOPIC='REQUEST', FORMAT='AVRO');
Run Code Online (Sandbox Code Playgroud)
对我来说一切都很好:
ksql> DESCRIBE TB_REQUEST;
Name : TB_REQUEST
Field | Type
-----------------------------------------------------------------------------------------------------------------------
ID | STRUCT<REQUEST_ID BIGINT> (primary key)
BEFORE | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
AFTER | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
SOURCE | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>
OP | VARCHAR(STRING)
TS_MS | BIGINT
TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>
-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
Run Code Online (Sandbox Code Playgroud)
然后我从“EMPLOYEE”主题创建另一个表:
ksql> CREATE TABLE TB_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> PRIMARY KEY)
WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');
Run Code Online (Sandbox Code Playgroud)
再次,一切似乎都很好。
ksql> DESCRIBE TB_EMPLOYEE;
Name : TB_EMPLOYEE
Field | Type
-----------------------------------------------------------------------------------------------------------------------
ID | STRUCT<EMPLOYEE_ID INTEGER> (primary key)
BEFORE | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
AFTER | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
SOURCE | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>
OP | VARCHAR(STRING)
TS_MS | BIGINT
TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>
-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
Run Code Online (Sandbox Code Playgroud)
但是,通过尝试创建我的目标表,通过员工 ID 加入以前的目标表。
ksql> CREATE TABLE REQUEST_EMPLOYEE AS
SELECT RQ.ID->REQUEST_ID, RQ.AFTER->REQUESTER_ID, RQ.AFTER->STATUS_ID, EM.ID->EMPLOYEE_ID, EM.AFTER->NAME AS REQUESTER
FROM TB_REQUEST RQ
JOIN TB_EMPLOYEE EM ON RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID;
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Could not determine output schema for query due to error: Invalid join condition: table-table joins require to join on the primary key of the right input table. Got RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID.
Statement: CREATE TABLE REQUEST_EMPLOYEE WITH (KAFKA_TOPIC='REQUEST_EMPLOYEE', PARTITIONS=1, REPLICAS=1) AS SELECT
RQ.ID->REQUEST_ID REQUEST_ID,
RQ.AFTER->REQUESTER_ID REQUESTER_ID,
RQ.AFTER->STATUS_ID STATUS_ID,
EM.ID->EMPLOYEE_ID EMPLOYEE_ID,
EM.AFTER->NAME REQUESTER
FROM TB_REQUEST RQ
INNER JOIN TB_EMPLOYEE EM ON ((RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID))
EMIT CHANGES;
Run Code Online (Sandbox Code Playgroud)
查看“DESCRIBE TB_EMPLOYEE”命令的输出,在我看来“EM.ID->EMPLOYEE_ID”是正确的选择。我缺少什么?
提前致谢。
PS:ksqlDB版本为0.21.0
我认为你应该在连接语句中至少使用一个行键,在以前版本的 KsqlDB 中连接表的唯一方法是通过行键,在当前版本 0.21.0 中可以使用外键。
检查以下示例:
CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.u_id EMIT CHANGES;
Run Code Online (Sandbox Code Playgroud)
其中 u_id 被定义为主键,因此是行键。
CREATE TABLE users (
u_id VARCHAR PRIMARY KEY
name VARCHAR
) WITH (
kafka_topic = 'users',
partitions = 3,
value_format = 'json'
);
Run Code Online (Sandbox Code Playgroud)
下面这句话也是类似的
CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.ROWKEY EMIT CHANGES;
Run Code Online (Sandbox Code Playgroud)
另一个观察结果是,KsqlDB 正在将 TB_EMPLOYE 的键视为 STRUCT<EMPLOYEE_ID INTEGER>,而不仅仅是整数。然后正在等待结构之间的比较。(具有相同的架构)
然后您可以执行以下步骤来创建表。
CREATE STREAM STREAM_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> KEY)
WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');
CREATE STREAM STREAM_REKEY_EMPLOYEE
AS SELECT ID->EMPLOYEE_ID employee_id, * FROM STREAM_EMPLOYEE
PARTITION BY ID->EMPLOYEE_ID
EMIT CHANGES;
CREATE TABLE TB_EMPLOYEE (employee_id PRIMARY KEY)
WITH (KAFKA_TOPIC='STREAM_REKEY_EMPLOYEE', FORMAT='AVRO');
Run Code Online (Sandbox Code Playgroud)
并使用employee_id字段加入,尝试使用你的主键作为原始类型。