Spring Kafka-事件源-如何使用Kafka + KafkaStreams API查询某些实体状态的示例

cod*_*ent 5 event-sourcing apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka

我正在使用Kafka来实现基于事件源的架构。

假设我以JSON格式存储事件:

{"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" }
Run Code Online (Sandbox Code Playgroud)

我想实现一个查询,以在特定日期获取productId = X的产品数量。

您可以使用Spring Kafka KStreams显示此查询的大概实现吗?

更新:我已经使用Spring Kafka KStreams对此进行了一些改进,但是却遇到了反序列化错误。

这是我的Spring Cloud Stream Kafka制作人

public interface ProductProducer{

    final String OUTPUT = "productsOut";

    @Output(ProductProducer.OUTPUT)
    MessageChannel output();

}
Run Code Online (Sandbox Code Playgroud)

配置:

spring:
  application:
    name: product-generator-service
  cloud:
    stream:
      kafka:
        binder:
          brokers:
          - kafka
          zk-nodes:
          - kafka
        bindings:
          productsOut:
            producer:
              sync: true
      bindings:
        productsOut: 
          destination: orders
          content-type: application/json
Run Code Online (Sandbox Code Playgroud)

我使用以下代码发送消息,该消息将Map正确序列化为JSON对象:

Map<String, Object> event = new HashMap<>();
event.put("name", "ProductCreated");
event.put("productId", product.getId());
event.put("quantity", product.getQuantity());
event.put("dateAdded", new Date());
        productProducer.output().send(MessageBuilder.withPayload(event).build(), 500);
Run Code Online (Sandbox Code Playgroud)

MessageBuilder.withPayload(event).build() -> GenericMessage [payload={quantity=1, productId=1, name=ProductCreated, dateAdded="xxxxx"}, headers={id=fc531176-e3e9-61b8-40e3-08074fabee4d, timestamp=1499845483095}]

ProductService应用程序中,我可以使用Spring Cloud Stream侦听器读取此消息:

@Component
public class ProductListener{

    @StreamListener(ProductConsumer.INPUT)
    public void handleProduct(Map<String, Object> event){
Run Code Online (Sandbox Code Playgroud)

但是,使用KStream时,出现反序列化错误:

@Configuration
public class KStreamsConfig {

    private static final String STREAMING_TOPIC1 = "orders";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-kstream");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }

    @Bean
    public FactoryBean<KStreamBuilder> myKStreamBuilder(StreamsConfig streamsConfig) {
        return new KStreamBuilderFactoryBean(streamsConfig);
    }

    @Bean
    public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder) {

        Serde<Integer> integerSerde = Serdes.Integer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<Integer, JsonNode> stream = kStreamBuilder.stream(null, integerSerde, jsonSerde, STREAMING_TOPIC1);
        stream.print();
        return stream;
    }

}
Run Code Online (Sandbox Code Playgroud)

例外情况

org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
 at [Source: [B@288e4e9a; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
 at [Source: [B@288e4e9a; line: 1, column: 4]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3528)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:30)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46)
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Run Code Online (Sandbox Code Playgroud)

更新2:

为了找出进入KStream的原因,我将键和值都更改为String反序列化器,这就是输出的内容:

KStream<Integer, String> stream = kStreamBuilder.stream(null, integerSerde, stringSerde, STREAMING_TOPIC1);
Run Code Online (Sandbox Code Playgroud)

印刷价值:

[KSTREAM-SOURCE-0000000000]: null , ?contentType
Run Code Online (Sandbox Code Playgroud)

为什么我没有得到JSON字符串?

更新3: 我解决了反序列化问题,原因是消息生产者(Spring Cloud Stream)默认添加了一些标头作为有效负载的一部分。我只需要禁用此标头包含即可开始在Kafka Streams中正确接收消息:

spring:
  application:
    name: product-service
  cloud:
    stream:
      kafka:
        binder:
          brokers:
          - kafka
          zk-nodes:
          - kafka
        bindings:
          productsOut:
            producer:
              sync: true
      bindings:
        productsIn:
          group: product-service 
          destination: orders
          consumer:
            max-attempts: 5
            header-mode: raw
        productsOut: 
          destination: orders
          content-type: application/json
          producer:
            header-mode: raw
Run Code Online (Sandbox Code Playgroud)

KStream定义:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);
Run Code Online (Sandbox Code Playgroud)

输出:

[KSTREAM-SOURCE-0000000000]: null , {"quantity":0,"productId":0,"name":"ProductCreated","dateAdded":1499930385450}
Run Code Online (Sandbox Code Playgroud)

现在,一切都已正确设置:如何实现所需的交互式查询?-> 在特定日期获取productId = X的产品数量

cod*_*ent 5

我设法混合使用 Spring Cloud Streams(生成消息)和 Spring Kafka 来处理 KafkaStreams 并实现交互式查询(重要:注意问题更新 3:能够将两者结合起来)来解决这个问题:

卡夫卡流配置

@Configuration
public class KStreamsConfig {

    private static final String STREAMING_TOPIC1 = "orders";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-streams");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }

    @Bean
    public KStreamBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
        return new KStreamBuilderFactoryBean(streamsConfig);
    }

    @Bean
    public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {

        Serde<Integer> integerSerde = Serdes.Integer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

        stream.map( (key, value) -> {
            return new KeyValue<>(value.get("productId").asInt(), value.get("quantity").asInt());
        }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");

        stream.print();
        return stream;
    }

}
Run Code Online (Sandbox Code Playgroud)

请注意我如何生成ProductsStock稍后将在服务中查询的 KTable 存储。

产品服务:

@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;

@Override
    public Integer getProductStock(Integer id) {
        KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
        ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
        streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
        return keyValueStore.get(id);
}
Run Code Online (Sandbox Code Playgroud)