来自单个主主题的多个流

bm1*_*729 11 java apache-kafka apache-kafka-streams

如何从单个主题创建多个流?当我做这样的事情时:

KStreamBuilder builder = new KStreamBuilder();

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output1");

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output2");

KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
    at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
    at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)
Run Code Online (Sandbox Code Playgroud)

我是否需要为"master"中的每个流创建另一个KafkaStream实例?

Cle*_*nte 18

您可以创建一个可以重用的KStream:

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");
Run Code Online (Sandbox Code Playgroud)

然后你可以重复使用它:

inputStream.filter(..logic1)
        .to(Serdes.String(), Serdes.String(), "output1");
inputStream.filter(..logic2)
        .to(Serdes.String(), Serdes.String(), "output2");

KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
Run Code Online (Sandbox Code Playgroud)

  • 如果你的过滤器没有重叠,你也可以使用`inputStream.branch()`返回非重叠的子流. (5认同)