Kafka时差最后两个记录,KSQL还是其他?

Thi*_*oth 5 apache-kafka apache-kafka-streams ksql

因此,我正在评估Kafka。在我们的用例中,必须创建一个包含从一个事件到下一个事件的“经过的时间”的新主题,这主要是因为传感器将向Kafka报告“开”或“关”。因此,具有时间戳,传感器名称和状态,可以创建持续时间为“ on”和“ off”状态的新主题。

  1. 这在KSQL中可行吗?
  2. 还是应该真的让消费者或流处理器来解决这个问题?

我的数据是这样的:

{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on} 
Run Code Online (Sandbox Code Playgroud)

得到结果

{ 2019:02:15 00:30:00, sensor1, off, 30sec }. 
Run Code Online (Sandbox Code Playgroud)

本质上,必须组合多个传感器的状态才能确定机器的组合状态。数百甚至不是最终工厂中的数千个传感器

eik*_*eik 2

这在 Kafka Streams 中非常简单,所以我会选择 2。

\n\n

首先,您必须正确地对输入数据进行建模。您的示例使用本地时间,这使得无法计算两个时间戳之间的持续时间。使用纪元时间之类的东西。

\n\n

从源数据模型开始,例如

\n\n
interface SensorState {\n  String getId();\n  Instant getTime();\n  State getState();\n  enum State {\n    OFF,\n    ON\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

和一个目标

\n\n
interface SensorStateWithDurationX {\n  SensorState getEvent();\n  Duration getDuration();\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

现在您已经定义了输入和输出流(但请参阅 \xe2\x80\x9c数据类型和序列化\xe2\x80\x9d),您只需要转换值(\xe2\x80\x9c应用处理器和转换器\xe2 \x80\x9d) 通过简单地定义一个ValueTransformer.

\n\n

它必须做两件事:

\n\n
    \n
  1. 检查状态存储以获取传感器的历史数据,并根据需要使用新数据进行更新

  2. \n
  3. 当历史数据可用时,计算时间戳之间的差异并将数据与计算的持续时间一起发出

  4. \n
\n\n
class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {\n  KeyValueStore<String, SensorState> store;\n\n  @SuppressWarnings("unchecked")\n  public void init(ProcessorContext context) {\n    this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");\n  }\n\n  public SensorStateWithDuration transform(SensorState sensorState) {\n    // Nothing to do\n    if (sensorState == null) {\n      return null;\n    }\n\n    // Check for the previous state, update if necessary\n    var oldState = checkAndUpdateSensorState(sensorState);\n\n    // When we have historical data, return duration so far. Otherwise return null\n    return oldState.map(state -> addDuration(state, sensorState)).orElse(null);\n  }\n\n  public void close() {}\n\n  /**\n   * Checks the state store for historical state based on sensor ID and updates it, if necessary.\n   *\n   * @param sensorState The new sensor state\n   * @return The old sensor state\n   */\n  Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {\n    // The Sensor ID is our index\n    var index = sensorState.getId();\n\n    // Get the historical state (might be null)\n    var oldState = store.get(index);\n    if (neetToUpdate(oldState, sensorState)) {\n      // Update the state store to the new state\n      store.put(index, sensorState);\n    }\n    return Optional.ofNullable(oldState);\n  }\n\n  /**\n   * Check if we need to update the state in the state store.\n   *\n   * <p>Either we have no historical data, or the state has changed.\n   *\n   * @param oldState The old sensor state\n   * @param sensorState The new sensor state\n   * @return Flag whether we need to update\n   */\n  boolean neetToUpdate(SensorState oldState, SensorState sensorState) {\n    return oldState == null || oldState.getState() != sensorState.getState();\n  }\n\n  /**\n   * Wrap the old state with a duration how log it lasted.\n   *\n   * @param oldState The state of the sensor so far\n   * @param sensorState The new state of the sensor\n   * @return Wrapped old state with duration\n   */\n  SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {\n    var duration = Duration.between(oldState.getTime(), sensorState.getTime());\n    return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

将所有内容放在一个简单的拓扑中(\xe2\x80\x9c连接处理器和状态存储\xe2\x80\x9d):

\n\n
var builder = new StreamsBuilder();\n\n// Our state store\nvar storeBuilder =\n    Stores.keyValueStoreBuilder(\n        Stores.persistentKeyValueStore("SensorStates"),\n        Serdes.String(),\n        storeSerde);\n\n// Register the store builder\nbuilder.addStateStore(storeBuilder);\n\nbuilder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))\n    .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)\n    .to("result-topic", Produced.with(Serdes.String(), resultSerde));\n\nvar topology = builder.build();\n
Run Code Online (Sandbox Code Playgroud)\n\n

完整的应用程序位于github.com/melsicon/kafka-sensors

\n