标签: apache-kafka-streams

KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量

我正在尝试在 Kafka 中实现事务,Processor以确保不会重复处理同一条消息。给定消息 (A),我需要创建将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A)。从文档中我发现该Producer方法sendOffsetsToTransaction似乎只有在成功时才能在事务中提交偏移量。process()这是我的方法中的代码Processor

    producer.beginTransaction()
    val topicPartition    = new TopicPartition(this.context().topic(), this.context().partition())
    val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
    val map               = Map(topicPartition -> offsetAndMetadata).asJava
    producer.sendOffsetsToTransaction(map, "consumer-group-id")
    items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
    producer.commitTransaction()
    throw new RuntimeException("expected exception")
Run Code Online (Sandbox Code Playgroud)

不幸的是,使用这段代码(显然每次执行都会失败),每次我在异常后重新启动应用程序时,都会重新处理已处理的消息 (A)。

我设法使其工作,将 a 添加+1到返回的偏移量this.context().offset()val offsetAndMetadata以这种方式重新定义:

val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
Run Code Online (Sandbox Code Playgroud)

这是正常行为还是我做错了什么?

谢谢 :)

scala apache-kafka kafka-producer-api apache-kafka-streams

1
推荐指数
1
解决办法
2922
查看次数

Kafka Streams API 中的任务有什么用途

我试图了解 Kafka Streams API 的架构,并在文档中遇到了这一点:

应用程序的处理器拓扑通过将其分解为多个任务来扩展

将处理器拓扑分解为任务的所有标准是什么?仅仅是流/主题中的分区数量还是更多。

然后,任务可以根据分配的分区实例化自己的处理器拓扑

有人可以用例子解释上面的意思吗?如果创建任务只是为了扩展,那么它们不应该具有相同的拓扑吗?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
977
查看次数

直接写入 kafka 状态存储

我们已经开始试验 Kafka,看看它是否可以用来聚合我们的应用程序数据。我认为我们的用例与 Kafka 流匹配,但我们不确定我们是否正确使用该工具。我们构建的概念验证似乎按设计工作,我不确定我们是否正确使用了 API。

我们的概念证明是使用 kafka 流来保存有关输出主题中程序的信息的运行记录,例如

{ 
  "numberActive": 0, 
  "numberInactive": 0, 
  "lastLogin": "01-01-1970T00:00:00Z"  
}
Run Code Online (Sandbox Code Playgroud)

计算计数很容易,它本质上是根据输入主题和输出字段执行比较和交换(CAS)操作。

本地状态包含给定密钥的最新程序。我们针对状态存储加入输入流,并使用 TransformSupplier 运行 CAS 操作,该操作使用 TransformSupplier 将数据显式写入状态存储

context.put(...)
context.commit();
Run Code Online (Sandbox Code Playgroud)

这是对当地国营商店的适当使用吗?是否还有另一种方法可以在主题中保持状态运行记录?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1433
查看次数

序列化 Avro 消息时出错

我在我的项目中使用 Kafka Streams 和 Spring Boot。在我的用例中,我通过使用 KStream API 进行序列化和消费来发送 Order 对象SpecificAvroSerializer。当我使用 KafkaProducer 发送对象时,出现以下异常

nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause. java.lang.NullPointerException: null

基于Confluence 示例开发了该项目。不知道我在哪里犯了错误。我真的很感激任何帮助。代码已上传至Github以供参考。

例外:

   2018-04-17 16:19:39.170 ERROR 6161 --- [nio-8090-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause

java.lang.NullPointerException: null
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:57) ~[kafka-schema-registry-client-3.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89) ~[kafka-schema-registry-client-3.0.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72) ~[kafka-avro-serializer-3.0.0.jar:na]
    at …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2万
查看次数

从 Kafka 存储变更日志读取时发生 OffsetOutOfRangeException

我有一个 Kafka Streams 应用程序,它正在从商店更改日志中读取数据,偶尔会抛出此错误:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-partition=offset}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:928)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1185)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84)
    at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:319)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
Run Code Online (Sandbox Code Playgroud)

我认为消费者应该默认latest。即使我尝试使用 或 来配置我的流属性ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatestearliest仍然看到此错误。为什么?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2992
查看次数

kafka-streams join 产生重复项

我有两个主题:

// photos
{'id': 1, 'user_id': 1, 'url': 'url#1'},
{'id': 2, 'user_id': 2, 'url': 'url#2'},
{'id': 3, 'user_id': 2, 'url': 'url#3'}

// users
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'}
Run Code Online (Sandbox Code Playgroud)

我按用户创建地图照片

KStream<Integer, Photo> photo_by_user = ...

photo_by_user.to("photo_by_user")
Run Code Online (Sandbox Code Playgroud)

然后,我尝试连接两个表:

KTable<Integer, User> users_table = builder.table("users");
KTable<Integer, Photo> photo_by_user_table = builder.table("photo_by_user");
KStream<Integer, Result> results = users_table.join(photo_by_user_table, (a, b) -> Result.from(a, b)).toStream();

results.to("results");
Run Code Online (Sandbox Code Playgroud)

结果就像

{'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'}
{'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1774
查看次数

Kafka 流是否可以将输出写入两个不同的主题?

我需要一个流来对主题 A 中的值进行分组,将分组值发送到主题 B,然后在滚动时间窗口内将这些分组值的总和发送到主题 C。在 Kafka 中是否可以做到这一点?或者一个流只能从一个主题读取和写入吗?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
3380
查看次数

Kafka Stream 自定义状态存储

我一直在准备关于国有商店的文档,但我仍然不清楚它是否符合我的目的。我想使用一些分布式图形数据库作为其他外部应用程序可以使用的状态存储。这可能吗?这涉及什么工作?任何人都可以向我指出需要扩展才能实现该功能的类/代码吗?

apache-kafka-streams

1
推荐指数
1
解决办法
1801
查看次数

Spring 和 Kafka 流 - 如何使用查询 API

我是 kafka 和 kafka 流的新手。我有一个与 kafka 生产者、消费者、KStream 和 KTable 一起工作的基本 Spring 服务。现在,我想检查我的 KTable 记录,因此为了实现它,我正在尝试使用 Kafka Query API。

这可以通过以下方式实现(没有 Spring 集成):

KafkaStreams streams = new KafkaStreams(topology, config);
// Get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custom-store", new MyCustomStoreType<String,String>());
// Query the store
String value = store.read("key");
Run Code Online (Sandbox Code Playgroud)

现在,我尝试使用基于 Spring 的 InteractiveQueryService 来进行查询……但是我在 Spring 启动中遇到了一些依赖问题。

在 Spring 中使用 kafka 查询 API 的最佳方法是什么?

我的服务中的 Spring kafka 配置如下所示:

@Bean("streamsBuilder")
public StreamsBuilderFactoryBean recordsStreamBuilderFactoryBean() {
    Map<String, Object> config = new HashMap<>();
    // set some properties …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1038
查看次数

Kafka Topic Retention 和对 Kafka 流中状态存储的影响

Materialized.as()在 Kafka 流字数统计应用程序中有一个状态存储(使用)。
根据我的理解,状态存储在 Kafka 内部主题中维护。


以下问题是:

  1. 状态存储是否可以拥有无​​限的键值对,或者它们受基于 log.retention 策略或 log.segment.bytes 的 kafka 主题规则的约束?
  2. 我设置了 log.retention.ms=60000 并期望状态存储值在一分钟后重置为 0。但我发现它并没有发生,我仍然可以从状态存储中看到值。kafka 是完全清除日志还是保留 SNAPSHOT 以防日志压缩主题?
  3. “提交段”是什么意思?

如果可用,请与解决方案的来源一起发布。

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1104
查看次数