我是 Kafka 的新手,我有一个问题。如果我知道主题、偏移量和分区,我可以从主题中仅删除一条消息吗?如果没有,还有其他选择吗?
我是Kafka的新手,正在尝试在本地计算机上创建一个新主题。
我正在跟踪此链接。
这是我遵循的步骤:
启动动物园管理员
bin/zookeeper-server-start.sh config/zookeeper.properties
Run Code Online (Sandbox Code Playgroud)启动kafka服务器
bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)创建一个话题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Run Code Online (Sandbox Code Playgroud)但是在创建主题时,出现以下错误:
Exception in thread "main" joptsimple.UnrecognizedOptionException: bootstrap-server is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:358)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:44)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Run Code Online (Sandbox Code Playgroud)
创建主题是否需要其他配置?我在做什么错
假设我有一个大约有 10 个分区的 kafka 主题,我知道每个消费者组应该有 10 个消费者在任何给定时间从该主题中读取数据,以实现最大并行度。
然而,我想知道对于一个主题在任何给定时间点可以处理的消费者组的数量是否也有任何直接的规则。(最近在一次采访中我被问到了这个问题)。据我所知,这取决于代理的配置,以便它在任何给定时间点可以处理多少个连接。
但是,只是想知道在给定时间点可以扩展多少个最大消费者组(每个消费者组有 10 个消费者)?
我正在阅读《Kafka:权威指南》第一版这本书来了解代理何时删除日志段。
根据我理解的文本,一个段在关闭之前不会被删除。仅当段达到 log.segment.bytes 大小时才能关闭(考虑未设置 log.segment.ms)。一旦某个段符合删除条件,log.retention.ms 策略将应用以最终决定何时删除该段。
然而,这似乎与我在生产集群(Kafka 2.5 版)中看到的行为相矛盾。
一旦满足 log.retention.ms,日志段就会被删除,即使段大小小于 log.segment.bytes。
[2020-12-24 15:51:17,808] INFO [日志分区=Topic-2,dir=/Folder/Kafka_data/kafka]由于保留时间604800000ms违规(kafka.log.日志)
[2020-12-24 15:51:17,808] INFO [日志分区=Topic-2, dir=/Folder/Kafka_data/kafka] 调度删除段 List(LogSegment(baseOffset=165828, size= 895454171 , lastModifiedTime=1608220234000,最大时间=1608220234478)) (kafka.log.Log)
大小仍然小于 1GB,但该段已被删除。
该书在新闻发布时提到 Kafka 版本是 0.9.0.1 。这个设置在 Kafka 的后续版本中也发生了变化。(我在 Kafka 文档中找不到任何具体提及此更改的信息)。以下是书中的片段。
有一个kafka主题有16个分区
使用给定的消费者组名称,我们目前正在启动单个消费者来读取该主题。
单个消费者是否partition 0(仅)阅读该主题?如果partition 0消息已空,消费者是否开始从下一个分区读取(partiton 1...等等)?
我们可以选择启动多个消费者(具有相同的消费者组名称)来读取同一主题(具有 16 个分区)。可以维护多少个消费者来并行读取多个分区?
我有一个3成员的kafka-cluster设置,__ consumer_offsets主题有50个分区.
以下是describe命令的结果:
root@kafka-cluster-0:~# kafka-topics.sh --zookeeper localhost:2181 --describe
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 1 Leader: -1 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 4 Leader: -1 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 5 Leader: 0 Replicas: 0 Isr: 0
...
...
Run Code Online (Sandbox Code Playgroud)
成员是节点0,1和2.
很明显,replica = …
我正在使用 AWS MSK 并且我想启用 ACL,但是当 ACL 开启时我无法创建主题。我正在使用命令行工具进行所有操作。这是我正在做的事情的总结:
所以问题是该主题是在 Zookeeper 上创建的,但代理无法访问它。大概是由于我遗漏了一些 ACL 规则。
我运行的命令的原始输出:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: …Run Code Online (Sandbox Code Playgroud) amazon-web-services apache-kafka apache-zookeeper kafka-topic aws-msk
我是 kafka 的新手,当我阅读 Kafka 文档时,我意识到使用相同密钥提供的消息将被映射到相同的分区以保证顺序。这完全有道理。但是,我想知道如果我们在运行时增加主题分区的数量,具有相同键的新消息是否会像以前一样散列到同一分区(旧分区)?
如果是这样,如果所有消息都提供了键,那么它们都不会映射到新分区怎么办?这对我来说没有意义。
如果不是,那么Kafka如何保证具有相同key的消息的顺序呢?
我正在学习 Apache Kafka,但我不明白如何使 kafka-topics.sh 与服务器上配置的 SASL_PLAINTEXT 身份验证一起使用。
这是一个server.properties内容:
security.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://10.10.10.16:9092
advertised.listeners=SASL_PLAINTEXT://10.10.10.16:9092
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="some-pass-1" \
user_admin="some-pass-1" \
user_myproducer="some-pass-2" \
user_myconsumer="some-pass-3";
Run Code Online (Sandbox Code Playgroud)
这是我在运行 kafka-topics.sh 之前向 KAFKA_OPTS 提供的 JAAS 文件内容:
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
security_protocol="SASL_PLAINTEXT"
sasl_mechanism="PLAIN"
username="admin"
password="some-pass-1";
};
Run Code Online (Sandbox Code Playgroud)
这是 kafka.log 内容和我不断收到的错误:
[2021-10-28 03:48:10,887] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /10.10.10.16 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2021-10-28 03:48:11,100] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /10.10.10.16 (Unexpected Kafka request of type …Run Code Online (Sandbox Code Playgroud) 我已经按照https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为 kafka 流应用程序编写了一个测试类 ,其代码是
import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Properties;
public class KafkaStreamsConfigTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;
private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();
private String key="test";
private Object value = "some value";
private Object expected_value = "real value";
String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";
String applicationId = "my-app";
String …Run Code Online (Sandbox Code Playgroud) unit-testing apache-kafka apache-kafka-streams kafka-topic spring-kafka-test