我有 Kafka 集群, log.dirs=/data/kafka设置为 server.properties 中的数据目录。由于这些日志占了很大一部分,我的 DATA 分区一直被填满。(谈论主题目录中的二进制日志,如 000000000000000.log)我在文档中读到了有关此参数的信息 (log.dirs 保存日志数据的目录。如果未设置,则使用 log.dir 中的值)
我还没有完全理解其含义此外,它们可以删除吗?应该配置哪些保留?是否建议将其与数据目录分开?谢谢
我想在 Apache Flink 中实现以下场景:
给定一个具有 4 个分区的 Kafka 主题,我想根据事件的类型使用不同的逻辑在 Flink 中独立处理分区内数据。
特别是,假设输入 Kafka 主题包含前面图像中描述的事件。每个事件都有不同的结构:分区 1 有字段“ a ”作为键,分区 2 有字段“ b ”作为键,等等。在 Flink 中,我想根据事件应用不同的业务逻辑,所以我想我应该以某种方式分割流。为了实现图中所描述的效果,我想只使用一个消费者来做类似的事情(我不明白为什么我应该使用更多):
FlinkKafkaConsumer<..> consumer = ...
DataStream<..> stream = flinkEnv.addSource(consumer);
stream.keyBy("a").map(new AEventMapper()).addSink(...);
stream.keyBy("b").map(new BEventMapper()).addSink(...);
stream.keyBy("c").map(new CEventMapper()).addSink(...);
stream.keyBy("d").map(new DEventMapper()).addSink(...);
Run Code Online (Sandbox Code Playgroud)
(一)正确吗?另外,如果我想并行处理每个 Flink 分区,因为我只想按顺序处理按同一 Kafka 分区排序的事件,而不是全局考虑它们,(b) 我该怎么办?我知道该方法的存在setParallelism(),但我不知道在这种情况下将其应用到哪里。
我正在寻找有关标记(a)和(b)的问题的答案。先感谢您。
parallel-processing partitioning apache-kafka apache-flink kafka-topic
像往常一样,看到拆分方法相对于其他方法的好处有点令人困惑。
Topic1 -> P0 和 Topic 2 -> P0 Topic 1 -> P0, P1P0和P1将举行不同的事件类型或实体。你唯一的好处是我可以看到另一个消费者是否需要主题 2 数据,那么它很容易消费
谢谢
我是 Kafka 新手,正在尝试在本地计算机上创建一个新主题。\n我正在关注此https://medium.com/@maftabali2k13/setting-up-a-kafka-cluster-on-ec2-1b37144cb4e
\n\nbin/zookeeper-server-start.sh -daemon config/zookeeper.properties\nRun Code Online (Sandbox Code Playgroud)\n\n启动kafka服务器
\n\nbin/kafka-server-start.sh -daemon config/server.properties\nRun Code Online (Sandbox Code Playgroud)\n\n创建主题
\n\nbin/kafka-topics.sh --create -\xe2\x80\x93bootstrap-server localhost:9092 -\xe2\x80\x93replication-factor 1 -\xe2\x80\x93partitions 1 --topic jerry\nRun Code Online (Sandbox Code Playgroud)\n\n但是在创建主题时,出现以下错误
\n\n\nException in thread "main" joptsimple.UnrecognizedOptionException: \xe2\x80\x93 is not a recognized option\n at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)\n at joptsimple.OptionParser.validateOptionCharacters(OptionParser.java:633)\n at joptsimple.OptionParser.handleShortOptionCluster(OptionParser.java:528)\n at joptsimple.OptionParser.handleShortOptionToken(OptionParser.java:523)\n at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:59)\n at joptsimple.OptionParser.parse(OptionParser.java:396)\n at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:552)\n at kafka.admin.TopicCommand$.main(TopicCommand.scala:49)\n at kafka.admin.TopicCommand.main(TopicCommand.scala)\nRun Code Online (Sandbox Code Playgroud)\n\n我看到了以下为什么卡夫卡不创建主题?bootstrap-server 不是一个可识别的选项\n但是我在这里找不到问题的答案,因为给出的错误不同。我在这里缺少一些东西吗?
\n我在本地机器上安装了 Kafka,并启动了 zookeeper 和一个代理服务器。
现在我有一个具有以下描述的主题:
~/Documents/backups/kafka_2.12-2.2.0/data/kafka$ kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic edu-topic --describe
Topic:edu-topic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: edu-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: edu-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: edu-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Run Code Online (Sandbox Code Playgroud)
我有一个生产者在消费者启动之前已经产生了一些消息,如下所示:
~/Documents/backups/kafka_2.12-2.2.0/data/kafka$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic edu-topic
>book
>pen
>pencil
>marker
>
Run Code Online (Sandbox Code Playgroud)
当我使用 --from-beginning 选项启动消费者时,它不会显示生产者产生的所有消息:
~/Documents/backups/kafka_2.12-2.2.0/data/kafka$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic edu-topic --group edu-service --from-beginning
Run Code Online (Sandbox Code Playgroud)
但是,它显示了新添加的消息。
我在这里做什么错了?有什么帮助吗?
apache-kafka kafka-consumer-api kafka-producer-api kafka-topic
嘿,我正在使用 Kafka Strimzi。我使用以下 yml 文件创建了我的 kafkaTopic 和 KafkaUser:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: my-user
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# Example consumer Acls for topic my-topic using consumer group my-group
- resource:
type: topic
name: my-topic
patternType: literal
operation: Read
host: "*"
- resource:
type: topic
name: my-topic
patternType: literal
operation: Describe
host: "*"
- resource:
type: group
name: my-group
patternType: literal
operation: Read
host: "*"
# Example Producer Acls for topic my-topic …Run Code Online (Sandbox Code Playgroud) 我有一个主题,其中所有日志都推送到集中主题,但如果可能,我想将其中一些记录过滤到单独的主题和集群。
谢谢
我刚开始使用 Kafka 并且对 Python 相当陌生。我正在使用这个名为的库kafka-python与我的 Kafka 代理进行通信。现在我需要从我的代码动态创建一个主题,从文档中我看到的是我可以调用create_topics()方法来这样做,但是我不确定,我将如何获得此类的实例。我无法从文档中理解这一点。
有人可以帮我弄这个吗?
Apache Kafka 安装在 Mac(英特尔)上。单一本地生产者和单一本地消费者。创建了 1 个具有 3 个分区和 1 个复制因子的主题:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic animal --partitions 3 --replication-factor 1
Run Code Online (Sandbox Code Playgroud)
生产者代码:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic animal
Run Code Online (Sandbox Code Playgroud)
制作人留言:
>alligator
>crocodile
>tiger
Run Code Online (Sandbox Code Playgroud)
生成消息时(通过生产者控制台手动),所有消息都会进入同一个分区。它们不应该跨分区分布吗?
我尝试过 3 条记录(如上所述),但它们仅发送到 1 个分区。在 tmp/kafka-logs/topic-0/00** 00.log 中检查 topic- 中的其他日志为空。
我尝试过几十条记录,但没有成功。
我什至在“config/server.properties”中增加了默认分区配置(num.partitions=3),但没有成功。
我也尝试过不同的主题,但没有运气。
我正在学习 Kafka 并尝试为我最近的搜索应用程序创建一个主题。推送到 kafka 主题的数据被认为是一个很大的数字。
我的 kafka 集群有 3 个代理,并且已经为其他需求创建了主题。
现在我应该为最近的搜索主题选择多少个分区?如果我没有明确提供分区号怎么办?选择分区号需要考虑哪些事项?
apache-kafka kafka-consumer-api kafka-producer-api kafka-topic
除了使用 Confluence Schema Registry 之外,还有其他方法(通过内置 CLI 工具)查看主题键和值的序列化格式吗?
apache-kafka ×11
kafka-topic ×11
apache-flink ×1
kafka-python ×1
logging ×1
partition ×1
partitioning ×1
python ×1
strimzi ×1