如何指定连接窗口的保留期限?

Ank*_*ana 3 apache-kafka apache-kafka-streams

我想加入两个流,并且将合并窗口设置为25小时,因为要合并的记录最多可以相隔24小时。

final Long JOIN_WINDOW = TimeUnit.HOURS.toMillis(25);

kstream.join(
  runsheetIdStream,
  (jt,r) -> { jt.setDate(r.getStart_date()); return jt; },
  JoinWindows.of(JOIN_WINDOW),
  Joined.with(Serdes.Long(),jobTransactionSerde,runsheetSerde))
Run Code Online (Sandbox Code Playgroud)

这将引发以下异常:

org.apache.kafka.streams.errors.TopologyException:无效的拓扑:联接窗口KSTREAM-JOINTHIS-0000000016-store的保留期必须不小于其窗口大小。

如何延长保留期限?

Jac*_*ski 5

join使用时,JoinWindows.of(JOIN_WINDOW)您隐式定义了基础状态存储的元数据。

JoinWindows.of的javadoc中:

指定如果相同键的记录的时间戳在timeDifference内,则该记录可以联接,即,辅助流中的记录的时间戳比主流中的记录的时间戳早或晚于max timeDifference。

所谓的保留期(也称为窗口维护持续时间)早于(直到Kafka Streams 2.1.0之前)指定使用直到

设置窗口维护持续时间(保留时间),以毫秒为单位。该保留时间是保证将窗口维持多长时间的下限。

由于默认情况下,保留期限为1天(目前无法找到参考),因此是例外情况。

从Kafka Streams 2.1.0开始,您应该使用Materialized API:

用于描述应如何实现StateStore。您可以通过一种接受供应商的方法来提供自定义StateStore后端,也可以通过仅提供商店名称来使用默认的RocksDB后端。

Materialized使您可以完全控制用于连接的基础状态存储,并提供withRetention(java.time.Durationtention)

配置窗口和会话存储的保留期。

请注意,保留期必须至少足够长,以包含窗口数据从窗口开始到窗口结束以及整个宽限期的整个生命周期。