Kafka Streams:如何写主题?

dme*_*ead 4 apache-kafka apache-kafka-streams

在Kafka Streams中,生成/编写流的规范方式是什么?在Spark中,有一个自定义接收器,它作为来自任意数据源的长时间运行的适配器.Kafka Streams中的等价物是什么?

具体来说,我不会问如何从一个主题转换到另一个主题.文档非常明确.我想了解如何编写我的工作人员,这些工作人员将在Kafka的一系列转换中首次编写.

我希望能够做到

builder1.<something>(<some intake worker like a spark reciver)
       .to(topic1)
       .start()

builder2.from(topic1)
        .transform(<some transformation function>)
        .to(topic2)
        .start()
Run Code Online (Sandbox Code Playgroud)

但是现有的文档都没有显示出来吗?我错过了什么吗?

Dmi*_*sky 8

取决于您使用的是Kafka Streams DSL还是处理器API:

  • Kafka Streams DSL 您可以使用KStream#to()实现KStream主题.这是将数据实现到主题的规范方法.或者,您可以使用KStream#through().这也将实现数据到主题,但也返回结果KStream以供进一步使用.然后,#to()和之间的唯一区别#through()是,KStreamBuilder#stream()如果您希望将生成的物化分区作为a ,它会为您节省一个KStream.

  • 处理器API通过将数据转发到接收器处理器,可以将数据实现到分区.

无论哪种方式,需要注意的一个重要事项是,在使用上述方法之一写入分区之前,数据不会实现主题.map(),filter()等不兑现的数据.数据保留在处理器任务/线程/内存中,直到通过上述方法之一实现.


要制作成Kafka Streams:

Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181");
producerConfig.put(ACKS_CONFIG, "all");
producerConfig.put(RETRIES_CONFIG, 0);
Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>());
Run Code Online (Sandbox Code Playgroud)

然后:

Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer)))
Run Code Online (Sandbox Code Playgroud)

你会需要:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${version.kafka}</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)


Mic*_*oll 5

我想了解如何编写我的工作人员,这些工作人员将进行一系列向kafka的转换中的第一次编写。

初始写入(=输入数据)不应通过Kafka Streams完成。Kafka Streams假定输入数据已经在Kafka中。

因此,您的预期工作流程不适用:

builder1.<something>(<some intake worker like a spark reciver)
   .to(topic1)
   .start()
Run Code Online (Sandbox Code Playgroud)

相反,您可以使用诸如Kafka Connect之类的数据将数据获取到Kafka中(例如,从数据库转换为Kafka主题),或使用“常规” Kafka生产者客户端(Java,C / C ++,Python等)来编写将数据输入Kafka。

Kafka Streams中还没有“挂钩”可用来引导输入数据。我们正在寻找更好的Kafka Connect和Kafka Streams集成,因此这种情况在不久的将来可能会得到改善。