Jad*_*dda 5 apache-kafka-streams
我正在尝试使用KTable来消费来自Kafka主题的事件.但是,它什么也没有回报.当我使用KStream时,它返回并打印对象.这真的很奇怪.制片人和消费者可以在这里找到
//Not working
KTable<String, Customer> customerKTable = streamsBuilder.table("customer", Consumed.with(Serdes.String(), customerSerde),Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name()));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)));
//KStream working
KStream<String, Customer> customerKStream= streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde));
customerKStream.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)))
Run Code Online (Sandbox Code Playgroud)
经过大量研究,我发现了我的语法问题。我使用的语法是有效的,基于 Confluence/Kafka 文档,但它不起作用。将向 Kafka 团队提出错误。现在,正在运行的新语法是
KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
.withKeySerde(Serdes.String())
.withValueSerde(customerSerde));
Run Code Online (Sandbox Code Playgroud)
我应该包括withKeySerde()和withValueSerde()以使 KTable 工作。但这是 Confluence/Kafka 文档中没有提到的
| 归档时间: |
|
| 查看次数: |
334 次 |
| 最近记录: |