反弹卡夫卡事件

use*_*887 5 apache-kafka debouncing apache-kafka-streams

我计划建立一个 MySQL 到 Kafka 的流程,最终目标是安排一个过程来根据更改的数据重新计算 mongoDB 文档。

这可能涉及直接修补 mongoDB 文档,或运行重新创建整个文档的进程。

我的问题是,如果对 MySQL 数据库的一组更改都与一个 mongoDB 文档相关,那么我不想为每个更改实时重新运行重新计算过程,我想等待更改“解决”,以便我只根据需要运行重新计算过程。

有没有办法“消除”卡夫卡流?例如,Kafka 消费者是否有一个明确定义的模式,我可以用它来实现我想要的逻辑?

aSe*_*emy 7

目前还没有简单的方法来消除事件。

简而言之,问题在于卡夫卡并不根据“挂钟时间”行事。处理通常由传入事件(以及其中包含的数据)触发,而不是由任意触发器(如系统时间)触发。

我将介绍为什么 Suppressed 和 SessionWindows 不起作用、KIP-242 中建议的解决方案以及未经测试的解决方法。

压抑

SuppresseduntilTimeLimit()功能,但是不适合去抖。

如果同一时间有另一个记录到达,它将替换缓冲区中的第一个记录,但不会重新启动计时器。

会话窗口

我认为使用SessionWindows.ofInactivityGapAndGrace()可能会起作用。

首先,我对输入 KStream 进行分组、窗口化、聚合和抑制:

  val windowedData: KTable<Windowed<Key>, Value> = 
    inputTopicKStream
      .groupByKey()
      .windowedBy(
        SessionWindows.ofInactivityGapAndGrace(
          WINDOW_INACTIVITY_DURATION,
          WINDOW_INACTIVITY_DURATION,
        )
      )
      .aggregate(...)
      .suppress(
        Suppressed.untilWindowCloses(
          Suppressed.BufferConfig.unbounded()
        )
      )
Run Code Online (Sandbox Code Playgroud)

然后我聚合了窗口,这样我就可以得到最终状态

  windowedData
      .groupBy(...)
      .reduce(
        /* adder */
        { a, b -> a + b },
        /* subtractor */
        { a, a -> a - a },
      )
Run Code Online (Sandbox Code Playgroud)

然而问题是,如果没有额外的记录出现,它就不会SessionWindows关闭。Kafka不会独立关闭窗口——它需要额外的记录来实现窗口可以关闭,并且可以转发新记录。suppress()

Confluence 的博客https://www.confluence.io/de-de/blog/kafka-streams-take-on-watermarks-and-triggers/中指出了这一点

[I]如果您停止获取新记录,挂钟时间将继续前进,但流时间将冻结。挂钟时间会提前,因为计算机中的小石英表一直在滴答作响,但流时间只会在您获得新记录时才会提前。由于没有新记录,流时间被冻结

KIP-424

KIP-424提出了一项改进,可以Suppress充当防抖器,但几年来没有任何进展。

解决方法

Andrey Bratus 在 JIRA 票证中为 KIP-424、KAFKA-7748提供了一个简单的解决方法。我尝试过,但它没有编译 - 我认为自从发布解决方法以来 Kafka API 已经发展了。我已经更新了代码,但还没有测试。

import java.time.Duration;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

/**
 * THIS PROCESSOR IS UNTESTED
 * <br>
 * This processor mirrors the source, but waits for an inactivity gap before forwarding records.
 * <br>
 * The suppression is key based. Newer values will replace previous values, and reset the inactivity
 * gap.
 */
public class SuppressProcessor<K, V> implements Processor<K, V, K, V> {

  private final String storeName;
  private final Duration debounceCheckInterval;
  private final long suppressTimeoutMillis;

  private TimestampedKeyValueStore<K, V> stateStore;
  private ProcessorContext<K, V> context;

  /**
   * @param storeName             The name of the {@link TimestampedKeyValueStore} which will hold
   *                              records while they are being debounced.
   * @param suppressTimeout       The duration of inactivity before records will be forwarded.
   * @param debounceCheckInterval How regularly all records will be checked to see if they are
   *                              eligible to be forwarded. The interval should be shorter than
   *                              {@code suppressTimeout}.
   */
  public SuppressProcessor(
      String storeName,
      Duration suppressTimeout,
      Duration debounceCheckInterval
  ) {
    this.storeName = storeName;
    this.suppressTimeoutMillis = suppressTimeout.toMillis();
    this.debounceCheckInterval = debounceCheckInterval;
  }

  @Override
  public void init(ProcessorContext<K, V> context) {
    this.context = context;

    stateStore = context.getStateStore(storeName);

    context.schedule(debounceCheckInterval, PunctuationType.WALL_CLOCK_TIME, this::punctuate);
  }

  @Override
  public void process(Record<K, V> record) {

    final var key = record.key();
    final var value = record.value();

    final var storedRecord = stateStore.get(key);

    final var isNewRecord = storedRecord == null;

    final var timestamp = isNewRecord ? System.currentTimeMillis() : storedRecord.timestamp();

    stateStore.put(key, ValueAndTimestamp.make(value, timestamp));
  }

  private void punctuate(long timestamp) {
    try (var iterator = stateStore.all()) {
      while (iterator.hasNext()) {
        KeyValue<K, ValueAndTimestamp<V>> storedRecord = iterator.next();
        if (timestamp - storedRecord.value.timestamp() > suppressTimeoutMillis) {

          final var record = new Record<>(
              storedRecord.key,
              storedRecord.value.value(),
              storedRecord.value.timestamp()
          );

          context.forward(record);
          stateStore.delete(storedRecord.key);
        }
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

  • @tstuber 我将这个示例重新编写到了 Kotlin 中,并且我已经成功地使用了它,尽管是在我的业余爱好项目中。https://github.com/adamko-dev/kafkatorio/blob/82818210753be59daf2f72d7520e51465f160b81/modules/events-processor-core/src/main/kotlin/dev/adamko/kafkatorio/processor/core/DebounceProcessor.kt (2认同)

mao*_*aow 1

如果您使用的是 Kafka Streams 应用程序,您可以尝试使用suppress

它旨在WindowedKStreamKTable阻止”更新,对于速率限制或窗口结束时的通知非常有用。

https://www.confluence.de/blog/kafka-streams-take-on-watermarks-and-triggers/有一个非常有用的解释