卡夫卡流加入

Nik*_*Nik 7 java join java-8 apache-kafka-streams

我有2个卡夫卡主题 - recommendationsclicks.第一个主题具有由唯一ID(称为recommendationsId)键控的推荐对象.每个产品都有一个用户可以单击的URL.

clicks主题通过点击推荐给用户的产品URL来生成消息.它已被设置为这些点击消息也被键入recommendationId.

注意

  1. 建议和点击之间的关系是一对多的.建议可能会导致多次点击,但点击始终与单个推荐相关联.

  2. 每个click对象都有一个相应的推荐对象.

  3. 点击对象的时间戳晚于推荐对象.

  4. 推荐和相应点击之间的差距可能是几秒到几天(比如最多7天).

我的目标是使用Kafka stream join加入这两个主题.我不清楚的是我是否应该使用KStream x KStream连接或KStream x KTable连接.

我通过表KStream x KTable连接clicks流实现了连接recommendations.但是,如果建议是加入者启动之前生成的,并且在加入者启动后点击到达,则无法看到任何加入的点击建议对.

我使用正确的加入吗?我应该使用KStream x KStream加入吗?如果是这样,为了能够在过去7天内加入带有推荐的点击,我应该将窗口大小设置为7天吗?在这种情况下,我还需要设置"保留"期吗?

我执行KStream x KTable连接的代码如下.请注意,我定义的类RecommendationsClick及其相应的SERDE.点击消息只是简单的String(url).此URL String与Recommendationsobject连接以创建发送到的Click对象jointTopic.

public static void main(String[] args){
    if(args.length!=4){
      throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
    }

    final String booststrapList = args[0];
    final String clicksTopic = args[1];
    final String recsTopic = args[2];
    final String jointTopic = args[3];

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());

    KStreamBuilder builder = new KStreamBuilder();

    // load clicks as KStream
    KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);

    // load recommendations as KTable
    KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);

    // join the two
    KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));

    // emit the join to the jointTopic
    join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

    // let the action begin
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
  }
Run Code Online (Sandbox Code Playgroud)

这只要已经产生两个建议,并点击工作正常夹板(以上程序)运行.但是,如果在网站运行之前生成推荐的点击到达,我看不到任何连接发生.我该如何解决?

如果解决方案是使用KStream x KSTreamjoin,那么请帮助我了解我应该选择哪个窗口大小以及选择的保留期限.

Mat*_*Sax 7

你的整体观察是正确的.从概念上讲,您可以通过两种方式获得正确的结果.如果使用流表连接,则有两个缺点(可能会在将来的Kafka版本中重新访问和改进)

  • 您已经提到过,如果在相应的推荐之前处理了点击,则(内部)联接将失败.但是,如您所知,将有推荐,您可以使用左连接而不是内连接,检查连接结果,如果建议是null(即,您重试,请将click事件写回输入主题)逻辑) - 当然,单个推荐的连续点击可能会出现故障,您可能需要在应用程序代码中考虑到这一点.
  • 第二个缺点KTable是,随着时间的推移它会永远增长并且无限制,因为你会为它添加越来越多的独特建议.因此,您需要通过将表单的逻辑删除记录发送<recommendationsId, null>到推荐主题来实现一些"过期逻辑",以删除您不再关心的旧建议.
  • 这种方法的优点是,与流 - 流连接相比,总共需要更少的内存/磁盘空间,因为您只需缓冲应用程序中的所有建议(但没有点击).

如果您使用流 - 流联接,并且在推荐后7天可能发生单击,则您的窗口大小必须为7天 - 否则,点击将不会与推荐一起加入.

  • 这种方法的缺点是,您将需要更多的内存/磁盘,因为您将缓冲应用程序中过去7天的所有点击和所有建议.
  • 优点是,订单或处理(即推荐与点击)不再重要(即,您不需要像上面描述的那样实施重试策略)
  • 此外,旧的建议会自动过时,因此您不需要实现特殊的"过期逻辑".

对于流 - 流加入,保留时间的答案略有不同.它必须至少7天,因为窗口大小是7天.否则,您将删除"运行窗口"的记录.您还可以将保留期设置得更长,以便能够处理"延迟数据".假设用户在窗口时间范围结束时(推荐的7天时间跨度前5分钟)点击,但点击仅在1小时后报告给您的应用程序.如果您的保留期限为7天作为您的窗口大小,则此迟到的记录将无法再处理(因为建议已被删除).如果您设置较长的保留期,例如8天,您仍然可以处理延迟记录.这取决于您的应用程序/语义需要您想要使用的保留时间.

简介:从实现的角度来看,使用流 - 流连接比使用流表连接更简单.但是,预计可以节省内存/磁盘,并且可能会很大,具体取决于您的点击流数据速率.

  • 它需要保存最近7天的所有数据,但不能保留在内存中.我们在内部使用可以溢出到磁盘的RocksDB.所以你可以拥有比主存更大的状态. - 关于扩展:您不能拥有比分区更多的实例.如果你需要更高的并行性来进行处理,你需要有更多的分区 - 一种方法是创建一个具有所需分区数的主题,并在执行之前通过调用`through()`重新分区输入数据加入.由于这个新主题仅适用于扩展,如果可以有较短的保留时间(如1小时?). (3认同)