use*_*887 5 apache-kafka debouncing apache-kafka-streams
我计划建立一个 MySQL 到 Kafka 的流程,最终目标是安排一个过程来根据更改的数据重新计算 mongoDB 文档。
这可能涉及直接修补 mongoDB 文档,或运行重新创建整个文档的进程。
我的问题是,如果对 MySQL 数据库的一组更改都与一个 mongoDB 文档相关,那么我不想为每个更改实时重新运行重新计算过程,我想等待更改“解决”,以便我只根据需要运行重新计算过程。
有没有办法“消除”卡夫卡流?例如,Kafka 消费者是否有一个明确定义的模式,我可以用它来实现我想要的逻辑?
目前还没有简单的方法来消除事件。
简而言之,问题在于卡夫卡并不根据“挂钟时间”行事。处理通常由传入事件(以及其中包含的数据)触发,而不是由任意触发器(如系统时间)触发。
我将介绍为什么 Suppressed 和 SessionWindows 不起作用、KIP-242 中建议的解决方案以及未经测试的解决方法。
Suppressed有untilTimeLimit()功能,但是不适合去抖。
如果同一时间有另一个记录到达,它将替换缓冲区中的第一个记录,但不会重新启动计时器。
我认为使用SessionWindows.ofInactivityGapAndGrace()可能会起作用。
首先,我对输入 KStream 进行分组、窗口化、聚合和抑制:
val windowedData: KTable<Windowed<Key>, Value> =
inputTopicKStream
.groupByKey()
.windowedBy(
SessionWindows.ofInactivityGapAndGrace(
WINDOW_INACTIVITY_DURATION,
WINDOW_INACTIVITY_DURATION,
)
)
.aggregate(...)
.suppress(
Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()
)
)
Run Code Online (Sandbox Code Playgroud)
然后我聚合了窗口,这样我就可以得到最终状态
windowedData
.groupBy(...)
.reduce(
/* adder */
{ a, b -> a + b },
/* subtractor */
{ a, a -> a - a },
)
Run Code Online (Sandbox Code Playgroud)
然而问题是,如果没有额外的记录出现,它就不会SessionWindows关闭。Kafka不会独立关闭窗口——它需要额外的记录来实现窗口可以关闭,并且可以转发新记录。suppress()
Confluence 的博客https://www.confluence.io/de-de/blog/kafka-streams-take-on-watermarks-and-triggers/中指出了这一点
[I]如果您停止获取新记录,挂钟时间将继续前进,但流时间将冻结。挂钟时间会提前,因为计算机中的小石英表一直在滴答作响,但流时间只会在您获得新记录时才会提前。由于没有新记录,流时间被冻结。
KIP-424提出了一项改进,可以Suppress充当防抖器,但几年来没有任何进展。
Andrey Bratus 在 JIRA 票证中为 KIP-424、KAFKA-7748提供了一个简单的解决方法。我尝试过,但它没有编译 - 我认为自从发布解决方法以来 Kafka API 已经发展了。我已经更新了代码,但还没有测试。
import java.time.Duration;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
/**
* THIS PROCESSOR IS UNTESTED
* <br>
* This processor mirrors the source, but waits for an inactivity gap before forwarding records.
* <br>
* The suppression is key based. Newer values will replace previous values, and reset the inactivity
* gap.
*/
public class SuppressProcessor<K, V> implements Processor<K, V, K, V> {
private final String storeName;
private final Duration debounceCheckInterval;
private final long suppressTimeoutMillis;
private TimestampedKeyValueStore<K, V> stateStore;
private ProcessorContext<K, V> context;
/**
* @param storeName The name of the {@link TimestampedKeyValueStore} which will hold
* records while they are being debounced.
* @param suppressTimeout The duration of inactivity before records will be forwarded.
* @param debounceCheckInterval How regularly all records will be checked to see if they are
* eligible to be forwarded. The interval should be shorter than
* {@code suppressTimeout}.
*/
public SuppressProcessor(
String storeName,
Duration suppressTimeout,
Duration debounceCheckInterval
) {
this.storeName = storeName;
this.suppressTimeoutMillis = suppressTimeout.toMillis();
this.debounceCheckInterval = debounceCheckInterval;
}
@Override
public void init(ProcessorContext<K, V> context) {
this.context = context;
stateStore = context.getStateStore(storeName);
context.schedule(debounceCheckInterval, PunctuationType.WALL_CLOCK_TIME, this::punctuate);
}
@Override
public void process(Record<K, V> record) {
final var key = record.key();
final var value = record.value();
final var storedRecord = stateStore.get(key);
final var isNewRecord = storedRecord == null;
final var timestamp = isNewRecord ? System.currentTimeMillis() : storedRecord.timestamp();
stateStore.put(key, ValueAndTimestamp.make(value, timestamp));
}
private void punctuate(long timestamp) {
try (var iterator = stateStore.all()) {
while (iterator.hasNext()) {
KeyValue<K, ValueAndTimestamp<V>> storedRecord = iterator.next();
if (timestamp - storedRecord.value.timestamp() > suppressTimeoutMillis) {
final var record = new Record<>(
storedRecord.key,
storedRecord.value.value(),
storedRecord.value.timestamp()
);
context.forward(record);
stateStore.delete(storedRecord.key);
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果您使用的是 Kafka Streams 应用程序,您可以尝试使用suppress
它旨在WindowedKStream“KTable阻止”更新,对于速率限制或窗口结束时的通知非常有用。
https://www.confluence.de/blog/kafka-streams-take-on-watermarks-and-triggers/有一个非常有用的解释
| 归档时间: |
|
| 查看次数: |
1872 次 |
| 最近记录: |