Gar*_*auh 1 apache-kafka apache-kafka-streams
假设我有两种类型的日志,它们有一个公共字段'uid',如果包含uid的这两个日志的日志到达,我想输出日志,就像连接一样,Kafka可能吗?
是的,一点没错.查看Kafka Streams,特别是DSL API.它类似于:
StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], Foo> fooStream = builder.stream("foo");
KStream<byte[], Bar> barStream = builder.stream("bar");
fooStream.join(barStream,
(foo, bar) -> {
foo.baz = bar.baz;
return foo;
},
JoinWindows.of(1000))
.to("buzz");
Run Code Online (Sandbox Code Playgroud)
这个简单的应用程序使用两个输入主题("foo"和"bar"),将它们连接起来并将它们写入主题"buzz".由于流是无限的,因此在连接两个流时,您需要指定一个连接窗口(上面1000毫秒),这是相应流上两条消息之间的相对时间差,以使它们有资格加入.
这是一个更完整的示例:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java
这是文档:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html.您会发现可以执行许多不同类型的连接:
值得注意的是,尽管上面的示例将确定性地同步流 - 如果重置和重新处理拓扑,每次都会得到相同的结果 - 并非Kafka Streams中的所有连接操作都是确定性的.从版本1.0.0及之前开始,大约一半不是确定性的,并且可能取决于从底层主题分区消耗的数据的顺序.具体而言,内KStream
- KStream
和所有的KTable
- KTable
连接是确定性的.其他连接,如all KStream
- KTable
join和left/outer KStream
- KStream
连接是非确定性的,并且取决于消费者使用的数据顺序.如果要将拓扑设计为可重新处理,请记住这一点.如果使用这些非确定性操作,当拓扑实时运行时,事件到达时的顺序将产生一个结果,但如果要重新处理拓扑,则可能会得到另一个结果.另请注意,操作KStream#merge()
也不会产生确定性结果.有关此问题的更多信息,请参阅为什么我的Kafka Streams拓扑不能正确重放/重新处理?这个邮件列表帖子
归档时间: |
|
查看次数: |
383 次 |
最近记录: |