标签: kafka-consumer-api

如何从Kafka中的两个不同集群进行消费?

我有两个 kafka 集群 A 和 B,B 是 A 的副本。我想仅在 A 关闭时才使用集群 B 中的消息,反之亦然。然而,使用来自两个集群的消息会导致重复消息。那么有什么方法可以配置我的 kafka 消费者仅从一个集群接收消息。

谢谢 -

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
3983
查看次数

卡夫卡组清理

我正在使用 kafka(v.0.10.1.1)和一些动态组(为每个新消费者创建新的 group.id)。我必须重新检查/过滤过去的事件流以获得某些标准,这意味着一段时间后我有很多组。是否会自动清理旧的未使用的组?

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
3840
查看次数

Kafka 在有状态处理中验证消息

我有一个应用程序,多个用户可以发送 REST 操作来修改共享对象的状态。当一个对象被修改时,将会发生多个操作(数据库、审计、日志记录......)。

并非所有操作都有效,例如您无法在删除对象后对其进行修改。

使用 Kafka 我正在考虑以下架构:

  1. 剩余操作在 Kafka 主题中排队。
  2. 对同一对象的操作将针对同一分区。因此该对象的所有操作都将按顺序并由消费者处理
  3. 消费者正在监听分区并使用内存数据库验证操作
  4. 如果操作有效,则发送到“有效操作主题”,否则发送到“无效操作主题”
  5. 其他消费者(db、log、audit)正在监听“有效操作主题”

我不太确定第三点。我不喜欢保留所有对象的状态的想法。(我有数十亿个对象,即使一个对象的大小可以达到 10mb,我需要存储来验证其状态的也只是几 KB...

然而,这是一种常见的模式吗?否则如何验证某些操作的有效性?

另外,您将使用什么作为内存数据库?当然它必须具有高可用性、容错性并支持事务(读和写)。

streaming in-memory-database apache-kafka kafka-consumer-api apache-kafka-streams

2
推荐指数
1
解决办法
1161
查看次数

Spring + Kafka:事务处理缓慢

刚开始使用Spring Kafka(2.1.4.RELEASE)和Kafka(1.0.0)但是当我添加事务时,处理速度降低了很多。

代码:

spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)

在 Java 中我添加了:

spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)

当我删除该setTransactionManager(transactionManager)语句后,速度提高了很多。我做错了什么吗?

performance apache-kafka kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
8456
查看次数

当源主题分区计数更新时,如何更新内部变更日志主题分区?

我有一个应用程序,其中使用 Kstream-Kstream 连接和 Ktream-Ktable 连接。我已将输入源主题分区计数从 4 更新为 16,并且应用程序因以下错误而停止。

Could not create internal topics: Existing internal topic application-test-processor-KSTREAM-JOINTHIS-0000000009-store-changelog has invalid partitions. Expected: 16 Actual: 4. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing. Retry #3

当源主题分区计数更新时,如何更新内部变更日志主题分区计数?

注意:我们使用的kafka版本:0.10.2.1

我从此链接查看了应用程序重置工具:https://docs.confluence.io/current/streams/developer-guide/app-reset-tool.html ,但它没有说明如何更新更改日志分区。

提前致谢。

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

2
推荐指数
1
解决办法
4091
查看次数

无法在 Kafka 中使用来自远程计算机的消息

我在我的一台机器上创建了一个kafka主题,其IP为192.168.25.50,主题名称为test-poc。然后通过使用 kafka-console- Producer 我生成了如下消息

kafka-console-producer --broker-list localhost:9092 --topic test-poc

>test message1

>test message2
Run Code Online (Sandbox Code Playgroud)

之后我在另一台机器上下载了kafka并尝试使用以下命令使用

kafka-console-consumer --bootstrap-server 192.168.25.50:9092 --topic test-poc --from-beginning 
Run Code Online (Sandbox Code Playgroud)

其中 192.168.25.50 是运行 Kafka 生产者的服务器的 IP。

因此,执行上述命令后,我收到以下错误。

[2018-06-28 20:45:12,822] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 2147483647 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-28 20:45:12,934] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-28 20:45:13,038] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 0 could not be …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
4742
查看次数

Kafka Consumer - 读取固定数量的消息

我可以创建一个 kafka 消费者来读取 Kafka 中固定(比如 100)数量的消息吗?我的条件是无论如何不能超过100,低于100就可以了。

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
3500
查看次数

Golang segmentio/kafka-go 消费者不工作

我正在使用segmentio/kafka-go连接到Kafka。

// to produce messages
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)

conn.SetWriteDeadline(time.Now().Add(10*time.Second))
conn.WriteMessages(
    kafka.Message{Value: []byte("one!")},
    kafka.Message{Value: []byte("two!")},
    kafka.Message{Value: []byte("three!")},
)

conn.Close()
Run Code Online (Sandbox Code Playgroud)

我可以使用此代码生成到我的 Kafka 服务器中。

// to consume messages
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    _, err := batch.Read(b)
    if err …
Run Code Online (Sandbox Code Playgroud)

go apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
2187
查看次数

Kafka 自动提交在幕后是同步还是异步?

如果 kafkaenable.auto.commit=true 是否意味着每 5 秒它将在后台触发同步或异步操作以提交偏移量?

小间隔(2秒)会以某种方式影响延迟吗?

java apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
1291
查看次数

如何在不同线程中处理@KafkaListener方法?

我在 Spring Boot 中有卡夫卡处理程序:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }
Run Code Online (Sandbox Code Playgroud)

例如,生产者每秒发送一条消息。但myService.processResponse工作10秒。我需要处理每条消息并myService.processResponse在新线程中开始。我可以创建我的执行者并将每个响应委托给它。但我认为 kafka 中还有其他配置可供使用。我找到了2个:

1)添加concurrency = "5"@KafkaListener注释 - 它似乎有效。但我不确定有多正确,因为我有第二种方法:

2)我可以创建ConcurrentKafkaListenerContainerFactory并设置它ConsumerFactory并且concurrency

我不明白这些方法之间的区别?concurrency = "5"只需添加到注释就足够了@KafkaListener还是我需要创建ConcurrentKafkaListenerContainerFactory

或者我根本不明白什么,还有其他方法吗?

java apache-kafka spring-boot kafka-consumer-api spring-kafka

2
推荐指数
2
解决办法
1万
查看次数