标签: kafka-topic

从 Kafka 的主题中删除一条消息

我是 Kafka 的新手,我有一个问题。如果我知道主题、偏移量和分区,我可以从主题中仅删除一条消息吗?如果没有,还有其他选择吗?

apache-kafka kafka-topic

19
推荐指数
1
解决办法
2万
查看次数

kafka为什么不创建主题?bootstrap-server不是公认的选项

我是Kafka的新手,正在尝试在本地计算机上创建一个新主题。

我正在跟踪此链接

这是我遵循的步骤:

  1. 启动动物园管理员

    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    Run Code Online (Sandbox Code Playgroud)
  2. 启动kafka服务器

    bin/kafka-server-start.sh config/server.properties
    
    Run Code Online (Sandbox Code Playgroud)
  3. 创建一个话题

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    Run Code Online (Sandbox Code Playgroud)

但是在创建主题时,出现以下错误:

Exception in thread "main" joptsimple.UnrecognizedOptionException: bootstrap-server is not a recognized option
    at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
    at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
    at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
    at joptsimple.OptionParser.parse(OptionParser.java:396)
    at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:358)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:44)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
Run Code Online (Sandbox Code Playgroud)

创建主题是否需要其他配置?我在做什么错

apache-kafka apache-zookeeper kafka-topic

16
推荐指数
1
解决办法
5212
查看次数

一个kafka topic可以处理多少个消费者群体?

假设我有一个大约有 10 个分区的 kafka 主题,我知道每个消费者组应该有 10 个消费者在任何给定时间从该主题中读取数据,以实现最大并行度。

然而,我想知道对于一个主题在任何给定时间点可以处理的消费者组的数量是否也有任何直接的规则。(最近在一次采访中我被问到了这个问题)。据我所知,这取决于代理的配置,以便它在任何给定时间点可以处理多少个连接。

但是,只是想知道在给定时间点可以扩展多少个最大消费者组(每个消费者组有 10 个消费者)?

apache-kafka kafka-consumer-api kafka-topic

13
推荐指数
3
解决办法
1万
查看次数

Kafka log.segment.bytes 与 log.retention.hours

我正在阅读《Kafka:权威指南》第一版这本书来了解代理何时删除日志段。

根据我理解的文本,一个段在关闭之前不会被删除。仅当段达到 log.segment.bytes 大小时才能关闭(考虑未设置 log.segment.ms)。一旦某个段符合删除条件,log.retention.ms 策略将应用以最终决定何时删除该段。

然而,这似乎与我在生产集群(Kafka 2.5 版)中看到的行为相矛盾。

一旦满足 log.retention.ms,日志段就会被删除,即使段大小小于 log.segment.bytes。

[2020-12-24 15:51:17,808] INFO [日志分区=Topic-2,dir=/Folder/Kafka_data/kafka]由于保留时间604800000ms违规(kafka.log.日志)

[2020-12-24 15:51:17,808] INFO [日志分区=Topic-2, dir=/Folder/Kafka_data/kafka] 调度删除段 List(LogSegment(baseOffset=165828, size= 895454171 , lastModifiedTime=1608220234000,最大时间=1608220234478)) (kafka.log.Log)

大小仍然小于 1GB,但该段已被删除。

该书在新闻发布时提到 Kafka 版本是 0.9.0.1 。这个设置在 Kafka 的后续版本中也发生了变化。(我在 Kafka 文档中找不到任何具体提及此更改的信息)。以下是书中的片段。

在此输入图像描述

apache-kafka retention kafka-topic kafka-partition

10
推荐指数
2
解决办法
2万
查看次数

单个消费者可以从 kafka 主题的多个分区中读取数据吗?

有一个kafka主题有16个分区

使用给定的消费者组名称,我们目前正在启动单个消费者来读取该主题。


  1. 单个消费者是否partition 0(仅)阅读该主题?如果partition 0消息已空,消费者是否开始从下一个分区读取(partiton 1...等等)?

  2. 我们可以选择启动多个消费者(具有相同的消费者组名称)来读取同一主题(具有 16 个分区)。可以维护多少个消费者来并行读取多个分区?

apache-kafka kafka-topic

10
推荐指数
2
解决办法
2万
查看次数

Kafka主题具有leader = -1(Kafka Leader Election)的分区,而节点已启动并正在运行

我有一个3成员的kafka-cluster设置,__ consumer_offsets主题有50个分区.

以下是describe命令的结果:

root@kafka-cluster-0:~# kafka-topics.sh --zookeeper localhost:2181 --describe
Topic:__consumer_offsets    PartitionCount:50   ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
    Topic: __consumer_offsets   Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: __consumer_offsets   Partition: 1    Leader: -1  Replicas: 2 Isr: 2
    Topic: __consumer_offsets   Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 3    Leader: 1   Replicas: 1 Isr: 1
    Topic: __consumer_offsets   Partition: 4    Leader: -1  Replicas: 2 Isr: 2
    Topic: __consumer_offsets   Partition: 5    Leader: 0   Replicas: 0 Isr: 0
    ...
    ...
Run Code Online (Sandbox Code Playgroud)

成员是节点0,1和2.

很明显,replica = …

apache-kafka apache-zookeeper kafka-topic

8
推荐指数
1
解决办法
1070
查看次数

AWS MSK - 在 ACL 打开的情况下创建 Kafka 主题时超时

我正在使用 AWS MSK 并且我想启用 ACL,但是当 ACL 开启时我无法创建主题。我正在使用命令行工具进行所有操作。这是我正在做的事情的总结:

  • 创建一个新集群
  • 创建一个主题 - 这很好用
  • 在 resource=CLUSTER 和 operation=ALL 上为 client1 打开 ACL
  • 使用 AdminClient 创建主题(通过提供 --bootstrap-server 选项) - 这会产生超时异常
  • 重新尝试创建相同的主题 - 这会给出一个错误,说明主题已经存在
  • 使用 AdminClient 列出主题 - 这不返回主题
  • 使用 Zookeeper 连接创建主题 - 这有效
  • 使用 Zookeeper connect 列出主题 - 这将返回我创建的所有主题(即使是那些超时的主题)

所以问题是该主题是在 Zookeeper 上创建的,但代理无法访问它。大概是由于我遗漏了一些 ACL 规则。

我运行的命令的原始输出:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services apache-kafka apache-zookeeper kafka-topic aws-msk

8
推荐指数
1
解决办法
3103
查看次数

当我们在运行时增加分区时,Kafka如何保证消息的顺序?

我是 kafka 的新手,当我阅读 Kafka 文档时,我意识到使用相同密钥提供的消息将被映射到相同的分区以保证顺序。这完全有道理。但是,我想知道如果我们在运行时增加主题分区的数量,具有相同键的新消息是否会像以前一样散列到同一分区(旧分区)?

如果是这样,如果所有消息都提供了键,那么它们都不会映射到新分区怎么办?这对我来说没有意义。

如果不是,那么Kafka如何保证具有相同key的消息的顺序呢?

apache-kafka kafka-topic kafka-partition

7
推荐指数
1
解决办法
1839
查看次数

Kafka-topics.sh 身份验证

我正在学习 Apache Kafka,但我不明白如何使 kafka-topics.sh 与服务器上配置的 SASL_PLAINTEXT 身份验证一起使用。

这是一个server.properties内容:

security.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT

listeners=SASL_PLAINTEXT://10.10.10.16:9092
advertised.listeners=SASL_PLAINTEXT://10.10.10.16:9092

listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="admin" \
   password="some-pass-1" \
   user_admin="some-pass-1" \
   user_myproducer="some-pass-2" \
   user_myconsumer="some-pass-3";
Run Code Online (Sandbox Code Playgroud)

这是我在运行 kafka-topics.sh 之前向 KAFKA_OPTS 提供的 JAAS 文件内容:

Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  security_protocol="SASL_PLAINTEXT"
  sasl_mechanism="PLAIN"
  username="admin"
  password="some-pass-1";
};
Run Code Online (Sandbox Code Playgroud)

这是 kafka.log 内容和我不断收到的错误:

[2021-10-28 03:48:10,887] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /10.10.10.16 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2021-10-28 03:48:11,100] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /10.10.10.16 (Unexpected Kafka request of type …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-topic

7
推荐指数
1
解决办法
1万
查看次数

Kafka Streams 测试:java.util.NoSuchElementException:未初始化的主题:“output_topic_name”

我已经按照https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为 kafka 流应用程序编写了一个测试类 ,其代码是

import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class KafkaStreamsConfigTest {

private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;

private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();

private String key="test";
private Object value = "some value";
private Object expected_value = "real value";

String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";

String applicationId = "my-app";
String …
Run Code Online (Sandbox Code Playgroud)

unit-testing apache-kafka apache-kafka-streams kafka-topic spring-kafka-test

6
推荐指数
1
解决办法
1458
查看次数