ksql,在表上选择不显示任何内容

PiP*_*aby 0 apache-kafka ksqldb

我创建了一个源主题订阅者,其输入消息如下:

\n\n
{\n  "ip_router": "",\n  "ip_lan": "",\n  "isdn": "2046573688",\n  "end_datetime": "",\n  "shop_code": "1000405100",\n  "reg_type_id": "5131615",\n  "contract_id": "",\n  "update_datetime": "20170801171355",\n  "project": "",\n  "telecom_service_id": "2",\n  "local_speed": "",\n  "password": "",\n  "price_plan": "",\n  "vip": "",\n  "local_price_plan": "",\n  "sub_id": "1083168000",\n  "sta_datetime": "20090511152847",\n  "update_number_1": "1",\n  "act_status": "000",\n  "network_class": "",\n  "limit_usage": "",\n  "num_reset_zone": "",\n  "deposit": "",\n  "create_user": "TUDV_POPBGG",\n  "num_of_computer": "",\n  "cust_id": "10922428129",\n  "status": "2",\n  "active_datetime": "20090511152102",\n  "ip_view": "",\n  "channel_type_id": "",\n  "ip_wan": "",\n  "imsi": "452049760887694",\n  "infrastructure_type": "",\n  "product_code": "HPN03",\n  "expire_datetime": "",\n  "speed": "",\n  "private_ip": "",\n  "update_user": "MIGRATE",\n  "ip_static": "",\n  "vlan": "",\n  "sub_type": "",\n  "create_datetime": "20090511152102",\n  "is_info_completed": "1",\n  "pay_type": "2",\n  "up_link": "",\n  "promotion_code": "",\n  "technology": "",\n  "offer_id": "400001035",\n  "dev_staff_id": "",\n  "account_id": "",\n  "deploy_accept_date": "",\n  "serial": "8984049767000887694",\n  "group_id": "",\n  "ip_gateway": "",\n  "first_connect": "",\n  "org_product_code": "MIGRATE",\n  "start_money": "100000",\n  "keep_alive": "",\n  "account": ""\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

然后我在上面创建了一个流和一个表:

\n\n
CREATE STREAM str_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC=\'subscriber\', VALUE_FORMAT=\'JSON\');\n\nCREATE TABLE tbl_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC=\'subscriber\', VALUE_FORMAT=\'JSON\', KEY = \'sub_id\' );\n
Run Code Online (Sandbox Code Playgroud)\n\n

我尝试使用ksql进行测试:

\n\n
SELECT * FROM str_subscriber_json; \n
Run Code Online (Sandbox Code Playgroud)\n\n

(当我将新的 json 放入订阅者主题时打印结果)

\n\n
SELECT * FROM tbl_subscriber_json; \n
Run Code Online (Sandbox Code Playgroud)\n\n

(当我将新的 json 放入订阅者主题时,没有显示任何内容)

\n\n

那么请帮我澄清一下这个案例有什么问题吗?

\n\n

非常感谢。

\n

Rob*_*att 5

概括

您的消息需要加密。如果您没有消息键,则 TABLE 的语义没有任何意义(因为如果没有键,则无法显示键的值)。

我复制了您的示例,用于kafkacat生成带密钥和不带密钥的消息。

测试 1 - 无钥匙

产生测试消息

$ echo '{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber
Run Code Online (Sandbox Code Playgroud)

流输出

注意null第二列中的 - 这是关键(第一列是消息的时间戳;其余列是消息中声明的字段)

ksql> select * from str_subscriber_json;
1528368689380 | null | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null
Run Code Online (Sandbox Code Playgroud)

表输出

ksql> SELECT * FROM tbl_subscriber_json;
Run Code Online (Sandbox Code Playgroud)

(无输出)

测试 2 - 使用密钥集

这里的键任意设置为1,使用kafkacat's-K标志指定:为键/值分隔符。

$ echo '1:{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber -K:
Run Code Online (Sandbox Code Playgroud)

流输出

注意1第二列中的 - 这是关键(第一列是消息的时间戳;其余列是消息中声明的字段)

1528368781916 | 1 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null
Run Code Online (Sandbox Code Playgroud)

表输出

1528368781916 | 1 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null
Run Code Online (Sandbox Code Playgroud)

使用 KSQL 自动重新键入主题

您可以使用 KSQL 重新分区主题。例如,以您的源subscriber主题为例,以下是如何使用 KSQL 对其进行重新分区以设置密钥:

ksql> CREATE STREAM SUBSCRIBER_KEYED AS SELECT * FROM str_subscriber_json PARTITION BY sub_id;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>
Run Code Online (Sandbox Code Playgroud)

这将填充一个 Kafka 主题 ( SUBSCRIBER_KEYED),然后您可以在其上定义一个表:

CREATE TABLE subscriber_table (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) \
WITH (KAFKA_TOPIC='SUBSCRIBER_KEYED', VALUE_FORMAT='JSON', KEY = 'sub_id' );
Run Code Online (Sandbox Code Playgroud)

现在,当您向 发送消息时subscriber,即使未键入消息,该表也将起作用:

ksql> select * from subscriber_table;
1528369407576 | 1083168000 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null
Run Code Online (Sandbox Code Playgroud)