小编Zam*_*rif的帖子

汇编4.1.0 - > KSQL:STREAM-TABLE join - >表数据null

步骤1:运行生产者以创建样本数据

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic stream-test-topic \
         --property schema.registry.url=http://localhost:8081 \
         --property value.schema='{"type":"record","name":"dealRecord","fields":[{"name":"DEAL_ID","type":"string"},{"name":"DEAL_EXPENSE_CODE","type":"string"},{"name":"DEAL_BRANCH","type":"string"}]}'
Run Code Online (Sandbox Code Playgroud)

样本数据 :

{"DEAL_ID":"deal002", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal003", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal004", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal005", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal006", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal007", "DEAL_EXPENSE_CODE":"EXP001", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal008", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal009", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal010", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal011", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal012", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}
Run Code Online (Sandbox Code Playgroud)

第2步:打开另一个终端并运行使用者来测试数据.

./bin/kafka-avro-console-consumer --topic stream-test-topic \
         --bootstrap-server localhost:9092 \
         --property schema.registry.url=http://localhost:8081 \
         --from-beginning
Run Code Online (Sandbox Code Playgroud)

步骤3:打开另一个终端并运行生产者.

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic expense-test-topic \
--property "parse.key=true" \
--property "key.separator=:" \
--property schema.registry.url=http://localhost:8081 \
--property key.schema='"string"' \ …
Run Code Online (Sandbox Code Playgroud)

apache-kafka ksql

5
推荐指数
1
解决办法
986
查看次数

KSQL左连接不起作用

我是stackoverflow的新手,所以让我知道如果我在这里发布这个问题我有什么不对.

我已经尝试找到答案,但无法在网站上找到KSQL JOIN相关问题,所以我发布了这个.我已经尝试了不同的方法来运行此查询,但我一直得到空指针异常,所以在此处发布.

我有两个kafka avro主题交易和费用,但数据有很多空白,以清楚我已创建以下主题和表与修剪数据.DEAL_STREAMEXPENSE_TABLE

ksql> describe EXPENSE_TABLE;
Run Code Online (Sandbox Code Playgroud)

结果:

Field      | Type
ROWTIME    | BIGINT (system)
ROWKEY     | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)

ksql> describe deal_stream;
Run Code Online (Sandbox Code Playgroud)

结果:

Field      | Type
ROWTIME    | BIGINT (system)
ROWKEY     | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)

当我执行以下Query时,它给我空指针异常.我尝试了以下查询.

1:

ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
Run Code Online (Sandbox Code Playgroud)

2: …

apache-kafka confluent ksql

0
推荐指数
1
解决办法
307
查看次数

标签 统计

apache-kafka ×2

ksql ×2

confluent ×1