Kafka Stream Chained LeftJoin - 在新消息之后再次处理以前的旧消息

Fiz*_*izi 5 java left-join apache-kafka apache-kafka-streams

我有一个流是其他流的组合

final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
    .leftJoin(
        compositeFundTable, 
        (CompositeImc cimc, CompositeFund cf) -> {
            CompositeInfo newCandidate = new CompositeInfo();
            if (cimc != null) {
                newCandidate.imcName = cimc.imcName;
                newCandidate.imcID = cimc.imcID;                                                                    
                if (cf != null) {
                    newCandidate.investments = cf.investments;
                }
            }
            return newCandidate;
        })
    .leftJoin(
        compositeGeographyTable, 
        (CompositeInfo cinfo, CompositeGeography cg) -> {
            if (cg != null) {
                cinfo.regions = cg.regions;
            }
            return cinfo;
        })
    .leftJoin(
        compositeSectorTable, 
        (CompositeInfo cinfo, CompositeSector cs) -> {
            if (cs != null) {
                cinfo.sectors = cs.sectors;
            }
            return cinfo;
        })
    .leftJoin(
        compositeClusterTable, 
        (CompositeInfo cinfo, CustomCluster cc) -> {
            if (cc != null && cc.clusters != null) {
                cinfo.clusters = cc.clusters;
            }
            return cinfo;
        })
    .leftJoin(
        compositeAlphaClusterTable, 
        (CompositeInfo cinfo, CompositeAlphaCluster cac) -> {
            if (cac != null) {
                cinfo.alphaClusters = cac.alphaClusters;
            };
            return cinfo;
        },
        Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
            .withKeySerde(Serdes.Long())
            .withValueSerde(compositeInfoSerde));
Run Code Online (Sandbox Code Playgroud)

我的问题与 CompositeInfo 和 CustomCluster 之间的左连接有关。CustomCluster 如下所示

KTable<Long, CustomCluster> compositeClusterTable = builder
    .stream(
        SUB_TOPIC_COMPOSITE_CLUSTER,
        Consumed.with(Serdes.Long(), compositeClusterSerde))
    .filter((k, v) -> v.clusters != null)
    .groupByKey(Serialized.with(Serdes.Long(), compositeClusterSerde))
    .reduce((aggValue, newValue) -> newValue);
Run Code Online (Sandbox Code Playgroud)

自定义集群中的消息看起来像

CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=MyName]]]
Run Code Online (Sandbox Code Playgroud)

所以我将这个对象中的 HashMap 集群分配给了 CompositeInfo 对象中的集群,这些集群连接到了 CompositeId 上。

我目睹的是 CustomCluster 消息针对给定的 CompositeId 传入并正确处理了 dis,但随后再次处理了包含前一个集群(我仍在调查此问题)的旧消息。通过挖掘问题发生在 kafka 内部 KTableKTableRightJoin

CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=MyName]]]
Run Code Online (Sandbox Code Playgroud)

当 joine 第一次返回时,newValue 会正确更新。但是代码然后转到 sendOldValues 块,一旦加入者返回,newValue 就是更新增益,但这次使用旧的集群值。

所以这里是我的问题:

  1. 为什么在第二次使用 oldValue 调用 joiner 时 newValues 会更新
  2. 有没有办法关闭 sendOldValues
  3. 我的链式左连接是否与它有关。我知道以前版本的 kafka 有一个链接错误。但现在我在 1.0

更新: 我发现的另一件事。如果我向上移动连接链并删除其他连接,则 sendOldValues 保持为 False。因此,如果我有以下内容:

public void process(final K key, final Change<V1> change) {
    // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
    if (key == null) {
        return;
    }

    final R newValue;
    R oldValue = null;

    final V2 value2 = valueGetter.get(key);
    if (value2 == null) {
        return;
    }

    newValue = joiner.apply(change.newValue, value2);

    if (sendOldValues) {
        oldValue = joiner.apply(change.oldValue, value2);
    }

    context().forward(key, new Change<>(newValue, oldValue));
}
Run Code Online (Sandbox Code Playgroud)

这给了我正确的结果。但我认为,如果我在此之后放置更多的链式连接,它们可能会显示相同的错误行为。

我现在不确定任何事情,但我认为我的问题在于链接的 leftjoin和计算oldValue的行为。有没有其他人遇到过这个问题?

更新

经过大量挖掘后,我意识到 sendOldValues 是 kafka 内部的,而不是我遇到的问题的原因。我的问题是当 oldValue 的 ValueJoiner 返回时 newValue 会发生变化,我不知道它是否是由于对 Java 对象的引用传递

这是传入对象的样子

CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=Sunil 2]]]
Run Code Online (Sandbox Code Playgroud)

集群是一个 HashSet<Cluster> clusters = new HashSet<Cluster>();

然后它被连接到一个对象

CompositeInfo [compositeName=BUCKET_NM-280, compositeID=280, imcID=19651, regions=null, sectors=null, clusters=[]]
Run Code Online (Sandbox Code Playgroud)

这里的集群类型相同,但在 CompositeInfo 类中

当我加入时,我将 CustomCluster 对象的集群分配给 CompositeInfo 对象

final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
    .leftJoin(
        compositeFundTable, 
        (CompositeImc cimc, CompositeFund cf) -> {
            CompositeInfo newCandidate = new CompositeInfo();
            if (cimc != null) {
                newCandidate.imcName = cimc.imcName;
                newCandidate.imcID = cimc.imcID;
                if (cf != null) {
                    newCandidate.investments = cf.investments;
                }
            }   
            return newCandidate;
        })
    .leftJoin(
        compositeClusterTable, 
        (CompositeInfo cinfo, CustomCluster cc) -> {
            if (cc != null && cc.clusters != null) {
                cinfo.clusters = cc.clusters;
            }
            return cinfo;
        },
        Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
          .withKeySerde(Serdes.Long())
          .withValueSerde(compositeInfoSerde));
Run Code Online (Sandbox Code Playgroud)

小智 3

在我自己偶然发现同一问题之后,我想提供详细的答案以及有助于说明问题的简化示例。

  @Bean
  public Function<KTable<String, String>,
    Function<KTable<String, String>, Consumer<KTable<String, String>>>> processEvents() {
    return firstnames ->
      lastnames ->
        titles -> firstnames
          .mapValues(firstname -> new Salutation().withFirstname(firstname))
          .join(lastnames, (salutation, lastname) -> salutation.withLastname(lastname))
          .leftJoin(titles, (salutation, title) -> salutation.withTitle(title))
          .toStream()
          .foreach((key, salutation) -> log.info("{}: {}", key, salutation));
  }
Run Code Online (Sandbox Code Playgroud)

该示例(使用 Spring Cloud Stream 和 Kafka Streams 绑定器)显示了一种常见模式,其中主题内容合并到累加器对象中。Salutation在我们的例子中,通过连接代表名字、姓氏和(可选)标题的主题,称呼(例如“亲爱的史密斯女士”)被累积/聚合到一个对象中。

需要注意的是,在这个例子中,实例Salutation是一个逐步构造的可变对象。当运行这样一段代码时,你会发现当改变一个人的姓氏时,合并总是“跑在后面”。这意味着,如果您发布姓氏事件,因为史密斯女士刚刚结婚,现在被称为“约翰逊”,那么 Kafka Streams 将再次发出代表Salutation“史密斯女士”的事件,尽管她更改了姓氏。只有当您在姓氏主题(例如“Miller”)上为同一个人发布另一个事件时,“亲爱的约翰逊女士”才会被记录。

出现此行为的原因可以在位于以下位置的一段代码中找到KTableKTableInnerJoin.java

if (change.newValue != null) {
    newValue = joiner.apply(change.newValue, valueRight);
}

if (sendOldValues && change.oldValue != null) {
    oldValue = joiner.apply(change.oldValue, valueRight);
}

context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
Run Code Online (Sandbox Code Playgroud)

joiner是 a ValueJoiner,在我们的例子中可以(salutation, lastname) -> salutation.withLastname(lastname)如上所示。这段代码的问题是,如果您使用带有可变累加器对象(在我们的例子中是 的实例Salutation)的累加模式,该对象(根据设计)被所有连接重用,那么oldValuenewValue将是同一个对象。而且,由于oldValue是事后计算的,所以会包含旧的姓氏,这就解释了为什么Spring Kafka落后了。

因此,至关重要的是,ValueJoiner每次返回的对象都是一个新对象,不包含对其他可变对象的引用,这些对象可能被共享(因此会发生变化)。因此,最安全的方法是返回ValueJoiner一个不可变的对象。

我不会认为这是库的错误,因为它必须以某种方式比较新旧状态,并且因为获取可变对象的快照将需要深层复制。但是,在文档中提及它可能是值得的。此外,何时发出警告oldValue == newValue至少可以让人们意识到这个问题。我将检查是否可以纳入此类改进。