我有两个 kafka 集群 A 和 B,B 是 A 的副本。我想仅在 A 关闭时才使用集群 B 中的消息,反之亦然。然而,使用来自两个集群的消息会导致重复消息。那么有什么方法可以配置我的 kafka 消费者仅从一个集群接收消息。
谢谢 -
我正在使用 kafka(v.0.10.1.1)和一些动态组(为每个新消费者创建新的 group.id)。我必须重新检查/过滤过去的事件流以获得某些标准,这意味着一段时间后我有很多组。是否会自动清理旧的未使用的组?
我有一个应用程序,多个用户可以发送 REST 操作来修改共享对象的状态。当一个对象被修改时,将会发生多个操作(数据库、审计、日志记录......)。
并非所有操作都有效,例如您无法在删除对象后对其进行修改。
使用 Kafka 我正在考虑以下架构:
我不太确定第三点。我不喜欢保留所有对象的状态的想法。(我有数十亿个对象,即使一个对象的大小可以达到 10mb,我需要存储来验证其状态的也只是几 KB...)
然而,这是一种常见的模式吗?否则如何验证某些操作的有效性?
另外,您将使用什么作为内存数据库?当然它必须具有高可用性、容错性并支持事务(读和写)。
streaming in-memory-database apache-kafka kafka-consumer-api apache-kafka-streams
刚开始使用Spring Kafka(2.1.4.RELEASE)和Kafka(1.0.0)但是当我添加事务时,处理速度降低了很多。
代码:
spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)
在 Java 中我添加了:
spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)
当我删除该setTransactionManager(transactionManager)语句后,速度提高了很多。我做错了什么吗?
我有一个应用程序,其中使用 Kstream-Kstream 连接和 Ktream-Ktable 连接。我已将输入源主题分区计数从 4 更新为 16,并且应用程序因以下错误而停止。
Could not create internal topics: Existing internal topic application-test-processor-KSTREAM-JOINTHIS-0000000009-store-changelog has invalid partitions. Expected: 16 Actual: 4. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing. Retry #3
当源主题分区计数更新时,如何更新内部变更日志主题分区计数?
注意:我们使用的kafka版本:0.10.2.1
我从此链接查看了应用程序重置工具:https://docs.confluence.io/current/streams/developer-guide/app-reset-tool.html ,但它没有说明如何更新更改日志分区。
提前致谢。
apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams
我在我的一台机器上创建了一个kafka主题,其IP为192.168.25.50,主题名称为test-poc。然后通过使用 kafka-console- Producer 我生成了如下消息
kafka-console-producer --broker-list localhost:9092 --topic test-poc
>test message1
>test message2
Run Code Online (Sandbox Code Playgroud)
之后我在另一台机器上下载了kafka并尝试使用以下命令使用
kafka-console-consumer --bootstrap-server 192.168.25.50:9092 --topic test-poc --from-beginning
Run Code Online (Sandbox Code Playgroud)
其中 192.168.25.50 是运行 Kafka 生产者的服务器的 IP。
因此,执行上述命令后,我收到以下错误。
[2018-06-28 20:45:12,822] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 2147483647 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-28 20:45:12,934] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-28 20:45:13,038] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 0 could not be …Run Code Online (Sandbox Code Playgroud) 我可以创建一个 kafka 消费者来读取 Kafka 中固定(比如 100)数量的消息吗?我的条件是无论如何不能超过100,低于100就可以了。
我正在使用segmentio/kafka-go连接到Kafka。
// to produce messages
topic := "my-topic"
partition := 0
conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
conn.SetWriteDeadline(time.Now().Add(10*time.Second))
conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
conn.Close()
Run Code Online (Sandbox Code Playgroud)
我可以使用此代码生成到我的 Kafka 服务器中。
// to consume messages
topic := "my-topic"
partition := 0
conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
_, err := batch.Read(b)
if err …Run Code Online (Sandbox Code Playgroud) 如果 kafkaenable.auto.commit=true
是否意味着每 5 秒它将在后台触发同步或异步操作以提交偏移量?
小间隔(2秒)会以某种方式影响延迟吗?
我在 Spring Boot 中有卡夫卡处理程序:
@KafkaListener(topics = "topic-one", groupId = "response")
public void listen(String response) {
myService.processResponse(response);
}
Run Code Online (Sandbox Code Playgroud)
例如,生产者每秒发送一条消息。但myService.processResponse工作10秒。我需要处理每条消息并myService.processResponse在新线程中开始。我可以创建我的执行者并将每个响应委托给它。但我认为 kafka 中还有其他配置可供使用。我找到了2个:
1)添加concurrency = "5"到@KafkaListener注释 - 它似乎有效。但我不确定有多正确,因为我有第二种方法:
2)我可以创建ConcurrentKafkaListenerContainerFactory并设置它ConsumerFactory并且concurrency
我不明白这些方法之间的区别?concurrency = "5"只需添加到注释就足够了@KafkaListener还是我需要创建ConcurrentKafkaListenerContainerFactory?
或者我根本不明白什么,还有其他方法吗?
java apache-kafka spring-boot kafka-consumer-api spring-kafka