Pra*_*kel 5 apache-kafka apache-kafka-streams
我有一个 kafka 流——比如博客和一个 kafka 表——比如与这些博客相关的评论。来自 kafka 流的键可以映射到 Kafka 表中的多个值,即一个博客可以有多个评论。我想将这两个连接起来并创建一个带有注释 ID 数组的新对象。但是当我加入时,流只包含最后一个评论 ID。是否有任何文档或示例代码可以为我指明如何实现这一目标的正确方向?基本上,是否有任何文档详细说明如何使用 Kafka 流和 Kafka 表进行一对多关系连接?
KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl,
(blogId, blog) -> blog.getBlogId(),
(blog, comment) -> new EnrichedBlog(blog, comment));
Run Code Online (Sandbox Code Playgroud)
所以,而不是评论 - 我需要有一组评论 ID。
我在您的代码示例中找不到签名匹配的 join 方法,但我认为这是问题所在:
KTables 被解释为一个 changlog,也就是说,每一个具有相同 key 的下一条消息都被解释为对记录的更新,而不是新的记录。这就是为什么您只能看到给定键(博客 ID)的最后一条“评论”消息,之前的值被覆盖。为了克服这个问题,您首先需要改变填充 KTable 的方式。您可以做的是将您的评论主题作为 KStream 添加到您的拓扑中,然后执行聚合,只需构建一个数组或共享相同博客 ID 的评论列表。该聚合返回一个 KTable,您可以使用它加入您的博客 KStream。
这是一个草图,您可以如何构建列表值 KTable:
builder.stream("yourCommentTopic") // where key is blog id
.groupByKey()
.aggregate(() -> new ArrayList(),
(key, value, agg) -> new KeyValue<>(key, agg.add(value)),
yourListSerde);
Run Code Online (Sandbox Code Playgroud)
列表在聚合中比数组更容易使用,因此我建议您在需要时将其转换为下游数组。您还需要为您的列表提供一个 serde 实现,在上面的示例中为“yourListSerde”。
| 归档时间: |
|
| 查看次数: |
2837 次 |
| 最近记录: |