标签: ktable

Kafka Streams:无法刷新由 java.lang.ClassCastException 引起的状态存储:无法区分大小写键值

我们已经使用 Kafka 流一段时间了,但从来没有编写测试来覆盖我们的拓扑。我们决定试一试并使用流库提供的拓扑测试驱动程序。不幸的是,我们遇到了无法解决的问题。这是我们的生产代码的虚拟版本,具有相同的语义。

它加入了包含 2 类文档的 2 个主题。我们的目标是将每个人的文档汇总到一个“文件夹”中,其中使用来自不同文档的信息。在运行测试时,我们遇到了一个异常,这是由于从 PersonKey 到 DocumentA 的错误转换造成的。在下面,您可以看到测试设置、数据结构的架构和异常的堆栈跟踪。

package com.zenjob.me.indexer.application.domain;

import com.demo.DocumentA;
import com.demo.DocumentB;
import com.demo.DocumentFolder;
import com.demo.PersonKey;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.Assert;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@SuppressWarnings("SimplifiableJUnitAssertion")
@Log4j2
class DemoTest {

    private SchemaRegistryClient schemaRegistryClient     = new MockSchemaRegistryClient();
    private String               documentATopicName       = "documentATopicName";
    private String …
Run Code Online (Sandbox Code Playgroud)

java avro apache-kafka apache-kafka-streams ktable

6
推荐指数
0
解决办法
756
查看次数

如何在Kafka主题中编写KTable?就像我们在 KStreams 中使用“to()”一样,如何为 KTable 做到这一点?

由于 KTable 没有“to()”方法,因此在向主题发送任何消息之前我们是否需要始终将其转换为 KStream?或者我们如何在我们的主题中存储 KTable?

apache-kafka apache-kafka-streams spring-kafka ktable

4
推荐指数
1
解决办法
2751
查看次数

Kafka重新分区(基于key的group by)

当我们基于某个键对流应用 group by 函数时,kafka 如何计算它,因为相同的键可能存在于不同的分区中?我看到 through() 函数基本上对数据进行了重新分区,但我不明白它是什么意思。它将具有相同密钥的所有消息移动到单个分区中吗?另外我们多久可以调用 through() 方法?如果有需求的话,我们可以在收到每条消息后调用它吗?请建议。谢谢

apache-kafka apache-kafka-streams ktable

2
推荐指数
1
解决办法
2670
查看次数

如何使用标点符号从状态存储中删除旧记录?(卡夫卡)

Ktable我使用 为主题创建了streamsBuilder.table("myTopic"),并将其具体化为状态存储,以便我可以使用交互式查询。

每小时,我都想从该状态存储(以及关联的变更日志主题)中删除其值在过去一小时内尚未更新的记录。

我相信使用标点符号可以实现这一点,但到目前为止我只使用过 DSL,因此不确定如何继续。如果有人能为我提供一个例子,我将非常感激。

谢谢,

杰克

java apache-kafka apache-kafka-streams ktable

2
推荐指数
1
解决办法
2236
查看次数