我正在尝试在 Kafka 中实现事务,Processor以确保不会重复处理同一条消息。给定消息 (A),我需要创建将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A)。从文档中我发现该Producer方法sendOffsetsToTransaction似乎只有在成功时才能在事务中提交偏移量。process()这是我的方法中的代码Processor:
producer.beginTransaction()
val topicPartition = new TopicPartition(this.context().topic(), this.context().partition())
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
val map = Map(topicPartition -> offsetAndMetadata).asJava
producer.sendOffsetsToTransaction(map, "consumer-group-id")
items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
producer.commitTransaction()
throw new RuntimeException("expected exception")
Run Code Online (Sandbox Code Playgroud)
不幸的是,使用这段代码(显然每次执行都会失败),每次我在异常后重新启动应用程序时,都会重新处理已处理的消息 (A)。
我设法使其工作,将 a 添加+1到返回的偏移量this.context().offset()并val offsetAndMetadata以这种方式重新定义:
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
Run Code Online (Sandbox Code Playgroud)
这是正常行为还是我做错了什么?
谢谢 :)
我试图了解 Kafka Streams API 的架构,并在文档中遇到了这一点:
应用程序的处理器拓扑通过将其分解为多个任务来扩展
将处理器拓扑分解为任务的所有标准是什么?仅仅是流/主题中的分区数量还是更多。
然后,任务可以根据分配的分区实例化自己的处理器拓扑
有人可以用例子解释上面的意思吗?如果创建任务只是为了扩展,那么它们不应该具有相同的拓扑吗?
我们已经开始试验 Kafka,看看它是否可以用来聚合我们的应用程序数据。我认为我们的用例与 Kafka 流匹配,但我们不确定我们是否正确使用该工具。我们构建的概念验证似乎按设计工作,我不确定我们是否正确使用了 API。
我们的概念证明是使用 kafka 流来保存有关输出主题中程序的信息的运行记录,例如
{
"numberActive": 0,
"numberInactive": 0,
"lastLogin": "01-01-1970T00:00:00Z"
}
Run Code Online (Sandbox Code Playgroud)
计算计数很容易,它本质上是根据输入主题和输出字段执行比较和交换(CAS)操作。
本地状态包含给定密钥的最新程序。我们针对状态存储加入输入流,并使用 TransformSupplier 运行 CAS 操作,该操作使用 TransformSupplier 将数据显式写入状态存储
context.put(...)
context.commit();
Run Code Online (Sandbox Code Playgroud)
这是对当地国营商店的适当使用吗?是否还有另一种方法可以在主题中保持状态运行记录?
我在我的项目中使用 Kafka Streams 和 Spring Boot。在我的用例中,我通过使用 KStream API 进行序列化和消费来发送 Order 对象SpecificAvroSerializer。当我使用 KafkaProducer 发送对象时,出现以下异常
nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause. java.lang.NullPointerException: null
基于Confluence 示例开发了该项目。不知道我在哪里犯了错误。我真的很感激任何帮助。代码已上传至Github以供参考。
例外:
2018-04-17 16:19:39.170 ERROR 6161 --- [nio-8090-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause
java.lang.NullPointerException: null
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:57) ~[kafka-schema-registry-client-3.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89) ~[kafka-schema-registry-client-3.0.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72) ~[kafka-avro-serializer-3.0.0.jar:na]
at …Run Code Online (Sandbox Code Playgroud) 我有一个 Kafka Streams 应用程序,它正在从商店更改日志中读取数据,偶尔会抛出此错误:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-partition=offset}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:928)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1185)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:319)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
Run Code Online (Sandbox Code Playgroud)
我认为消费者应该默认latest。即使我尝试使用 或 来配置我的流属性ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,latest我earliest仍然看到此错误。为什么?
我有两个主题:
// photos
{'id': 1, 'user_id': 1, 'url': 'url#1'},
{'id': 2, 'user_id': 2, 'url': 'url#2'},
{'id': 3, 'user_id': 2, 'url': 'url#3'}
// users
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'}
Run Code Online (Sandbox Code Playgroud)
我按用户创建地图照片
KStream<Integer, Photo> photo_by_user = ...
photo_by_user.to("photo_by_user")
Run Code Online (Sandbox Code Playgroud)
然后,我尝试连接两个表:
KTable<Integer, User> users_table = builder.table("users");
KTable<Integer, Photo> photo_by_user_table = builder.table("photo_by_user");
KStream<Integer, Result> results = users_table.join(photo_by_user_table, (a, b) -> Result.from(a, b)).toStream();
results.to("results");
Run Code Online (Sandbox Code Playgroud)
结果就像
{'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'}
{'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': …Run Code Online (Sandbox Code Playgroud) 我需要一个流来对主题 A 中的值进行分组,将分组值发送到主题 B,然后在滚动时间窗口内将这些分组值的总和发送到主题 C。在 Kafka 中是否可以做到这一点?或者一个流只能从一个主题读取和写入吗?
我一直在准备关于国有商店的文档,但我仍然不清楚它是否符合我的目的。我想使用一些分布式图形数据库作为其他外部应用程序可以使用的状态存储。这可能吗?这涉及什么工作?任何人都可以向我指出需要扩展才能实现该功能的类/代码吗?
我是 kafka 和 kafka 流的新手。我有一个与 kafka 生产者、消费者、KStream 和 KTable 一起工作的基本 Spring 服务。现在,我想检查我的 KTable 记录,因此为了实现它,我正在尝试使用 Kafka Query API。
这可以通过以下方式实现(没有 Spring 集成):
KafkaStreams streams = new KafkaStreams(topology, config);
// Get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custom-store", new MyCustomStoreType<String,String>());
// Query the store
String value = store.read("key");
Run Code Online (Sandbox Code Playgroud)
现在,我尝试使用基于 Spring 的 InteractiveQueryService 来进行查询……但是我在 Spring 启动中遇到了一些依赖问题。
在 Spring 中使用 kafka 查询 API 的最佳方法是什么?
我的服务中的 Spring kafka 配置如下所示:
@Bean("streamsBuilder")
public StreamsBuilderFactoryBean recordsStreamBuilderFactoryBean() {
Map<String, Object> config = new HashMap<>();
// set some properties …Run Code Online (Sandbox Code Playgroud) 我Materialized.as()在 Kafka 流字数统计应用程序中有一个状态存储(使用)。
根据我的理解,状态存储在 Kafka 内部主题中维护。
如果可用,请与解决方案的来源一起发布。