小编Дми*_*пов的帖子

Kafka 流 - 加入两个 ktables 调用连接函数两次

我正在尝试加入 2 个 KTables。

KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(RecordBean.class),
    bidTopic, RECORDS_STORE);

KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(ImpressionBean.class),
    impressionTopic, IMPRESSIONS_STORE);

KTable<String, RecordBean> mergedByTxId = recordsTable
    .join(impressionsTable, merge());
Run Code Online (Sandbox Code Playgroud)

合并函数非常简单,我只是将值从一个 bean 复制到另一个 bean。

public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
  v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
  return v1;
};
Run Code Online (Sandbox Code Playgroud)

但是由于某些原因,join 函数在单个生成的记录上调用了两次。请参阅下面的流媒体/制作人配置

Properties streamsConfiguration = new Properties();
streamsConfiguration
    .put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
    .getAbsolutePath());

return streamsConfiguration;
Run Code Online (Sandbox Code Playgroud)

生产者配置 -

Properties producerConfig …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
2782
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1