Kafka比较一个键的连续值

Moh*_*tha 3 apache-kafka apache-kafka-streams

我们正在构建一个应用程序来从传感器获取数据。数据被流式传输到 Kafka,从那里消费者将其发布到不同的数据存储。每个数据点将具有表示传感器状态的多个属性。

在其中一个消费者中,我们希望仅在值发生更改时才将数据发布到数据存储。例如,如果有温度传感器每 10 秒轮询一次数据,我们希望收到数据,例如

----------------------------------------------------------------------
Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:50", temperature: 11}
Run Code Online (Sandbox Code Playgroud)

在上述情况下,只应发布第一条记录和第三条记录。

为此,我们需要某种方法来比较某个键的当前值与具有相同键的先前值。我相信 KTable 或 KStream 应该可以做到这一点,但无法找到示例。

任何帮助都会很棒!

Bru*_*nna 5

Here is an example how to solve this with KStream#transformValues().

StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
                                Serdes.String(),
                                YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
    .transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
        private KeyValueStore<String, YourValueType> state;

        @Override
        public void init(final ProcessorContext context) {
            state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}

        @Override
        public YourValueType transform(final String key, final YourValueType value) {
            YourValueType prevValue = state.get(key);
            if (prevValue != null) {
                if (prevValue.temperature() != value.temperature()) {
                    return prevValue;
                }
            } else {
                state.put(key, value);
            }
            return null;
       }

       @Override
       public void close() {}
    }, stateStorName))
    .to(OUTPUT_TOPIC);
Run Code Online (Sandbox Code Playgroud)

You compare the record with the previous record stored in the state store. If temperature is different you return the record from the state store and store the current record in the state store. If the temperature is equal you discard the current record.