PiP*_*aby 0 apache-kafka ksqldb
我创建了一个源主题订阅者,其输入消息如下:
\n\n{\n "ip_router": "",\n "ip_lan": "",\n "isdn": "2046573688",\n "end_datetime": "",\n "shop_code": "1000405100",\n "reg_type_id": "5131615",\n "contract_id": "",\n "update_datetime": "20170801171355",\n "project": "",\n "telecom_service_id": "2",\n "local_speed": "",\n "password": "",\n "price_plan": "",\n "vip": "",\n "local_price_plan": "",\n "sub_id": "1083168000",\n "sta_datetime": "20090511152847",\n "update_number_1": "1",\n "act_status": "000",\n "network_class": "",\n "limit_usage": "",\n "num_reset_zone": "",\n "deposit": "",\n "create_user": "TUDV_POPBGG",\n "num_of_computer": "",\n "cust_id": "10922428129",\n "status": "2",\n "active_datetime": "20090511152102",\n "ip_view": "",\n "channel_type_id": "",\n "ip_wan": "",\n "imsi": "452049760887694",\n "infrastructure_type": "",\n "product_code": "HPN03",\n "expire_datetime": "",\n "speed": "",\n "private_ip": "",\n "update_user": "MIGRATE",\n "ip_static": "",\n "vlan": "",\n "sub_type": "",\n "create_datetime": "20090511152102",\n "is_info_completed": "1",\n "pay_type": "2",\n "up_link": "",\n "promotion_code": "",\n "technology": "",\n "offer_id": "400001035",\n "dev_staff_id": "",\n "account_id": "",\n "deploy_accept_date": "",\n "serial": "8984049767000887694",\n "group_id": "",\n "ip_gateway": "",\n "first_connect": "",\n "org_product_code": "MIGRATE",\n "start_money": "100000",\n "keep_alive": "",\n "account": ""\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n然后我在上面创建了一个流和一个表:
\n\nCREATE STREAM str_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC=\'subscriber\', VALUE_FORMAT=\'JSON\');\n\nCREATE TABLE tbl_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC=\'subscriber\', VALUE_FORMAT=\'JSON\', KEY = \'sub_id\' );\n
Run Code Online (Sandbox Code Playgroud)\n\n我尝试使用ksql进行测试:
\n\nSELECT * FROM str_subscriber_json; \n
Run Code Online (Sandbox Code Playgroud)\n\n(当我将新的 json 放入订阅者主题时打印结果)
\n\nSELECT * FROM tbl_subscriber_json; \n
Run Code Online (Sandbox Code Playgroud)\n\n(当我将新的 json 放入订阅者主题时,没有显示任何内容)
\n\n那么请帮我澄清一下这个案例有什么问题吗?
\n\n非常感谢。
\n您的消息需要加密。如果您没有消息键,则 TABLE 的语义没有任何意义(因为如果没有键,则无法显示键的值)。
我复制了您的示例,用于kafkacat
生成带密钥和不带密钥的消息。
$ echo '{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber
Run Code Online (Sandbox Code Playgroud)
注意null
第二列中的 - 这是关键(第一列是消息的时间戳;其余列是消息中声明的字段)
ksql> select * from str_subscriber_json;
1528368689380 | null | 1083168000 | | 10922428129 | | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 | | 0 | 0 | 1000405100 | | | | | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE | | | | MIGRATE | | 100000 | 1 | | | | | | | | | | | | 0 | | | | | | | | | | | | 2 | 1 | 5131615 | null | null
Run Code Online (Sandbox Code Playgroud)
ksql> SELECT * FROM tbl_subscriber_json;
Run Code Online (Sandbox Code Playgroud)
(无输出)
这里的键任意设置为1
,使用kafkacat
's-K
标志指定:
为键/值分隔符。
$ echo '1:{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber -K:
Run Code Online (Sandbox Code Playgroud)
注意1
第二列中的 - 这是关键(第一列是消息的时间戳;其余列是消息中声明的字段)
1528368781916 | 1 | 1083168000 | | 10922428129 | | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 | | 0 | 0 | 1000405100 | | | | | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE | | | | MIGRATE | | 100000 | 1 | | | | | | | | | | | | 0 | | | | | | | | | | | | 2 | 1 | 5131615 | null | null
Run Code Online (Sandbox Code Playgroud)
1528368781916 | 1 | 1083168000 | | 10922428129 | | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 | | 0 | 0 | 1000405100 | | | | | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE | | | | MIGRATE | | 100000 | 1 | | | | | | | | | | | | 0 | | | | | | | | | | | | 2 | 1 | 5131615 | null | null
Run Code Online (Sandbox Code Playgroud)
您可以使用 KSQL 重新分区主题。例如,以您的源subscriber
主题为例,以下是如何使用 KSQL 对其进行重新分区以设置密钥:
ksql> CREATE STREAM SUBSCRIBER_KEYED AS SELECT * FROM str_subscriber_json PARTITION BY sub_id;
Message
----------------------------
Stream created and running
----------------------------
ksql>
Run Code Online (Sandbox Code Playgroud)
这将填充一个 Kafka 主题 ( SUBSCRIBER_KEYED
),然后您可以在其上定义一个表:
CREATE TABLE subscriber_table (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) \
WITH (KAFKA_TOPIC='SUBSCRIBER_KEYED', VALUE_FORMAT='JSON', KEY = 'sub_id' );
Run Code Online (Sandbox Code Playgroud)
现在,当您向 发送消息时subscriber
,即使未键入消息,该表也将起作用:
ksql> select * from subscriber_table;
1528369407576 | 1083168000 | 1083168000 | | 10922428129 | | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 | | 0 | 0 | 1000405100 | | | | | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE | | | | MIGRATE | | 100000 | 1 | | | | | | | | | | | | 0 | | | | | | | | | | | | 2 | 1 | 5131615 | null | null
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2257 次 |
最近记录: |