KafkaStreams 同一应用程序中的多个流

nbp*_*eth 0 apache-kafka apache-kafka-streams

我正在尝试根据 KafkaStreams 的惯例和合理性做出实际的设计决策。

假设我有两个不同的事件要放入KTables 中。我有一个生产者将这些消息发送给KStream正在收听该主题的生产者。

据我所知,我不能对使用 的消息使用条件转发KafkaStreams,因此如果流订阅了许多主题(例如,上述每个消息一个),我只能调用stream.to一个接收器主题 - 否则,我会做一些事情,比如foreach在流上调用并将带有 a 的消息发送KProducer到接收器主题。

以上建议使用单个流。我以为我可以在同一个应用程序中设置多个流,每个流监听一个主题,映射并转发到一个表接收器,但是每次我尝试创建 的两个实例时KafkaStreams,只有第一个初始化订阅它的主题 - 另一个得到一个来自客户端的警告,它的主题没有订阅。

我可以在同一个应用程序中设置多个流吗?如果有,有什么特殊要求吗?

    class Stream(topic: String) {
      val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
      val streamsBuilder = new StreamsBuilder
      val topics = new util.ArrayList[String]
      topics.add(props.get("topic"))

      val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))

      def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
        builder.stream[String, String](
          topics,
          Consumed.`with`(String(), String())
        )
      }

      def init(): KafkaStreams = {
        val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)

        streams.start()

        streams
      }
    }

    class Streams() {

      val eventStream = new Stream("first_event") //looking good!
      val eventStream2 = new Stream("second_event") // no subscribers
      //if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
      val streams: KafkaStreams = eventStream.init()
      val streams2: KafkaStreams = eventStream2.init()

    }
Run Code Online (Sandbox Code Playgroud)

流配置

    val streamConfig: Properties = {
        val properties = new Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
        properties
    }
Run Code Online (Sandbox Code Playgroud)

我也喜欢任何建议的替代方案

Boh*_*huk 8

当您创建 KafkaStreams 时,您需要使用不同的 application.id 传递属性,例如:

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP1");
    StreamsBuilder builder = new SteamsBuilder();
    KStream stream1 = builder.stream("topic1");
    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();
Run Code Online (Sandbox Code Playgroud)

然后你应该创建另一个流:

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP2");
    StreamsBuilder builder = new SteamsBuilder();
    KStream stream2 = builder.stream("topic2");
    KafkaStreams streams2 = new KafkaStreams(builder, props);
    streams2.start();
Run Code Online (Sandbox Code Playgroud)