Kafka-Streams使用JSON值加入2个主题| 背压机制?

sri*_*nth 2 java json apache-kafka apache-kafka-streams

我正在学习Kafka Streams并尝试实现以下目标:

创建了2个Kafka主题(比如topic1,topic2),其中null为键,而JSONString为值。来自topic1的数据(无重复项)在topic2中具有多个匹配条目。即,topic1具有一些主流数据,当与topic2结合时可以生成新的多个数据流。

例:

topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}
Run Code Online (Sandbox Code Playgroud)

预期产量: {"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}

想要持久保存/保留topic1的数据流以供将来参考,而topic2的数据流仅用于实现上述用例,不需要任何持久性/保留。

我有几个问题:1)应该将topic1数据流保存/存储几天(可能吗?),以便可以合并来自topic2的传入数据流。可能吗?2)我应该使用什么实现KStream或KTable?3)这称为背压机制吗?

Kafka Stream是否支持此用例,还是我应该注意其他事项?请建议。

我用5分钟的窗口用KStream尝试了一段代码,但是看起来我无法在流中保存topic1数据。

请以正确的选择帮助我并加入。我正在使用Confluent的Kafka和Docker实例。

public void run() {
        final StreamsBuilder builder = new StreamsBuilder();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
        final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);

        // Hold data from this topic to 30 days
        KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
        cs.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        // Data is involved in one time process.
        KStream<String, JsonNode> css = builder.stream("topic2", consumed);
        css.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        KStream<String, JsonNode> resultStream = cs.leftJoin(css,
                valueJoiner,
                JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
                Joined.with(
                        Serdes.String(), /* key */
                        jsonSerde,       /* left value */
                        jsonSerde)       /* right value */
        );

        resultStream.foreach((k, v) -> {
            System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
        });

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();
    }
Run Code Online (Sandbox Code Playgroud)

Mat*_*Sax 5

Kafka中的联接始终基于密钥。因此,要使任何联接生效,您需要在进行实际联接之前将要联接的字段提取到键中(唯一的部分例外是KStream-GlobalKTable联接)。在您的代码示例中,您将不会获得任何结果,因为所有记录都具有null键,并且由于这个原因无法被连接。

对于联接本身,看来KStream-KTable联接将是您的用例的正确选择。为了使这项工作,您将需要:

  1. 正确设置连接密钥,topic1并将数据写入其他主题(我们称之为topic1Keyed
  2. topic1Keyed为表格
  3. 正确设置连接密钥 topic2
  4. 加入topic2KTable

有关连接语义的完整详细信息,请查看此博客文章:https : //www.confluent.io/blog/crossing-streams-joins-apache-kafka/

  • 为了理解这个主题,所提到的博客文章绝对是必读的。写得很漂亮,或者我应该说如图:) (2认同)