Thi*_*oth 5 apache-kafka apache-kafka-streams ksql
因此,我正在评估Kafka。在我们的用例中,必须创建一个包含从一个事件到下一个事件的“经过的时间”的新主题,这主要是因为传感器将向Kafka报告“开”或“关”。因此,具有时间戳,传感器名称和状态,可以创建持续时间为“ on”和“ off”状态的新主题。
我的数据是这样的:
{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on}
Run Code Online (Sandbox Code Playgroud)
得到结果
{ 2019:02:15 00:30:00, sensor1, off, 30sec }.
Run Code Online (Sandbox Code Playgroud)
本质上,必须组合多个传感器的状态才能确定机器的组合状态。数百甚至不是最终工厂中的数千个传感器
这在 Kafka Streams 中非常简单,所以我会选择 2。
\n\n首先,您必须正确地对输入数据进行建模。您的示例使用本地时间,这使得无法计算两个时间戳之间的持续时间。使用纪元时间之类的东西。
\n\n从源数据模型开始,例如
\n\ninterface SensorState {\n String getId();\n Instant getTime();\n State getState();\n enum State {\n OFF,\n ON\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\n和一个目标
\n\ninterface SensorStateWithDurationX {\n SensorState getEvent();\n Duration getDuration();\n}\nRun Code Online (Sandbox Code Playgroud)\n\n现在您已经定义了输入和输出流(但请参阅 \xe2\x80\x9c数据类型和序列化\xe2\x80\x9d),您只需要转换值(\xe2\x80\x9c应用处理器和转换器\xe2 \x80\x9d) 通过简单地定义一个ValueTransformer.
它必须做两件事:
\n\n检查状态存储以获取传感器的历史数据,并根据需要使用新数据进行更新
当历史数据可用时,计算时间戳之间的差异并将数据与计算的持续时间一起发出
class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {\n KeyValueStore<String, SensorState> store;\n\n @SuppressWarnings("unchecked")\n public void init(ProcessorContext context) {\n this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");\n }\n\n public SensorStateWithDuration transform(SensorState sensorState) {\n // Nothing to do\n if (sensorState == null) {\n return null;\n }\n\n // Check for the previous state, update if necessary\n var oldState = checkAndUpdateSensorState(sensorState);\n\n // When we have historical data, return duration so far. Otherwise return null\n return oldState.map(state -> addDuration(state, sensorState)).orElse(null);\n }\n\n public void close() {}\n\n /**\n * Checks the state store for historical state based on sensor ID and updates it, if necessary.\n *\n * @param sensorState The new sensor state\n * @return The old sensor state\n */\n Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {\n // The Sensor ID is our index\n var index = sensorState.getId();\n\n // Get the historical state (might be null)\n var oldState = store.get(index);\n if (neetToUpdate(oldState, sensorState)) {\n // Update the state store to the new state\n store.put(index, sensorState);\n }\n return Optional.ofNullable(oldState);\n }\n\n /**\n * Check if we need to update the state in the state store.\n *\n * <p>Either we have no historical data, or the state has changed.\n *\n * @param oldState The old sensor state\n * @param sensorState The new sensor state\n * @return Flag whether we need to update\n */\n boolean neetToUpdate(SensorState oldState, SensorState sensorState) {\n return oldState == null || oldState.getState() != sensorState.getState();\n }\n\n /**\n * Wrap the old state with a duration how log it lasted.\n *\n * @param oldState The state of the sensor so far\n * @param sensorState The new state of the sensor\n * @return Wrapped old state with duration\n */\n SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {\n var duration = Duration.between(oldState.getTime(), sensorState.getTime());\n return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\n将所有内容放在一个简单的拓扑中(\xe2\x80\x9c连接处理器和状态存储\xe2\x80\x9d):
\n\nvar builder = new StreamsBuilder();\n\n// Our state store\nvar storeBuilder =\n Stores.keyValueStoreBuilder(\n Stores.persistentKeyValueStore("SensorStates"),\n Serdes.String(),\n storeSerde);\n\n// Register the store builder\nbuilder.addStateStore(storeBuilder);\n\nbuilder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))\n .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)\n .to("result-topic", Produced.with(Serdes.String(), resultSerde));\n\nvar topology = builder.build();\nRun Code Online (Sandbox Code Playgroud)\n\n完整的应用程序位于github.com/melsicon/kafka-sensors。
\n| 归档时间: |
|
| 查看次数: |
201 次 |
| 最近记录: |