小编mik*_*ike的帖子

从 Kafka 的主题中删除一条消息

我是 Kafka 的新手,我有一个问题。如果我知道主题、偏移量和分区,我可以从主题中仅删除一条消息吗?如果没有,还有其他选择吗?

apache-kafka kafka-topic

19
推荐指数
1
解决办法
2万
查看次数

如何在Structured Streaming的kafka数据源中为消费者组设置group.id?

我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据。这意味着我需要强制使用特定的 group.id。但是,正如文档中所述,这是不可能的。尽管如此,在 databricks 文档https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 中,它说这是可能的。这是否仅指 azure 集群?

另外,通过查看 apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md的 master 分支的文档,我们可以理解这样的功能旨在在以后的 Spark 版本中添加。你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id 吗?

如果没有,Spark 2.4.0 是否有任何解决方法可以设置特定的消费者 group.id?

apache-kafka apache-spark spark-structured-streaming spark-kafka-integration

9
推荐指数
2
解决办法
5504
查看次数

在 Kafka 生产者上启用幂等性是否会降低吞吐量

我在休息端点调用中启用了幂等性的kafka 生产者(没有启用一次语义或事务)。我启用它的原因是因为我不希望卡夫卡重试导致任何重复。我担心以下几点:

  • 幂等性会减慢我的端点速度吗?(这个端点需要非常快)
  • 我读了 kafka api 文档,启用幂等性将使重试无限(什么?)
  • 如果我不将幂等性用于事务,我真的需要幂等性吗?

apache-kafka kafka-producer-api

9
推荐指数
1
解决办法
2万
查看次数

Kafka 分区和 Kafka 副本有什么区别?

我创建了 3 个 Kafka 经纪人设置,经纪人 ID 为 20、21、22。然后我创建了这个主题:

bin/kafka-topics.sh --zookeeper localhost:2181 \
  --create --topic zeta --partitions 4 --replication-factor 3
Run Code Online (Sandbox Code Playgroud)

结果是:

在此输入图像描述

当生产者向主题 zeta 发送消息“hello world”时,Kafka 首先将消息写入哪个分区?

“hello world”消息会在所有 4 个分区中复制吗?

3 个代理中的每个代理都包含所有 4 个分区?这与上述上下文中的复制因子 3 有何关系?

如果我有 8 个在自己的进程或线程中并行运行的消费者订阅了 zeta 主题,Kafka 如何分配分区或代理来并行服务这些消费者?

apache-kafka kafka-producer-api

7
推荐指数
2
解决办法
5207
查看次数

当我们在运行时增加分区时,Kafka如何保证消息的顺序?

我是 kafka 的新手,当我阅读 Kafka 文档时,我意识到使用相同密钥提供的消息将被映射到相同的分区以保证顺序。这完全有道理。但是,我想知道如果我们在运行时增加主题分区的数量,具有相同键的新消息是否会像以前一样散列到同一分区(旧分区)?

如果是这样,如果所有消息都提供了键,那么它们都不会映射到新分区怎么办?这对我来说没有意义。

如果不是,那么Kafka如何保证具有相同key的消息的顺序呢?

apache-kafka kafka-topic kafka-partition

7
推荐指数
1
解决办法
1839
查看次数

尝试删除 Kafka 中的消费者组时出现 GroupNotEmptyException

我执行了

"kafka-consumer-groups --bootstrap-server localhost:9092 --list" 
Run Code Online (Sandbox Code Playgroud)

这会导致显示一组:console-consumer-961

然后我尝试删除该组:

kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group console-consumer-961
Run Code Online (Sandbox Code Playgroud)

但这会导致异常:

Error: Deletion of some consumer groups failed:
* Group 'console-consumer-961' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
Run Code Online (Sandbox Code Playgroud)

我猜这个组是在我运行 kafka- 时创建的console-consumer.bat,但现在这个消费者没有运行。如何删除这个消费组?

apache-kafka kafka-consumer-api

6
推荐指数
1
解决办法
1万
查看次数

Spark Structured Streaming with Kafka - 如何重新分区数据并在工作节点之间分配处理

如果我的 Kafka 主题收到类似的记录

CHANNEL | VIEWERS | .....
ABC     |  100    | .....
CBS     |  200    | .....
Run Code Online (Sandbox Code Playgroud)

我有 Spark 结构化流代码来读取和处理 Kafka 记录,如下所示:

val spark = SparkSession 
      .builder 
      .appName("TestPartition") 
      .master("local[*]") 
      .getOrCreate() 

    import spark.implicits._ 

    val dataFrame = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", 
      "1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092") 
      .option("subscribe", "partition_test") 
      .option("failOnDataLoss", "false") 
      .load() 
      .selectExpr("CAST(value AS STRING)") 
      // I will use a custom UDF to transform to a specific object
Run Code Online (Sandbox Code Playgroud)

目前,我使用 foreachwriter 处理记录如下:

val writer = new ForeachWriter[testRec] {
    def open(partitionId: Long, version: Long): Boolean = {
      true …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration

5
推荐指数
1
解决办法
904
查看次数

如何使用 Spark Structured Streaming 将数据从 Kafka 主题流式传输到 Delta 表

我正在尝试了解数据块增量并考虑使用 Kafka 进行 POC。基本上计划是使用来自 Kafka 的数据并将其插入到 databricks delta 表中。

这些是我所做的步骤:

  1. 在数据块上创建增量表。
%sql
CREATE TABLE hazriq_delta_trial2 (
  value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
Run Code Online (Sandbox Code Playgroud)
  1. 消费来自 Kafka 的数据。
%sql
CREATE TABLE hazriq_delta_trial2 (
  value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
Run Code Online (Sandbox Code Playgroud)

但是,当我查询表时,它是空的。

我可以确认数据来了。当我向 Kafka 主题生成消息时,我通过查看图中的尖峰来验证它。

传入数据

我错过了什么吗?

我需要关于如何将从 Kafka 获得的数据插入到表中的帮助。

scala apache-kafka apache-spark spark-structured-streaming delta-lake

5
推荐指数
1
解决办法
1932
查看次数

如何在kafka中创建一个新的消费者组

我按照此处的快速入门指南的说明在本地运行 kafka ,

然后我定义了我的消费者组配置,config/consumer.properties以便我的消费者可以从定义的group.id

运行以下命令,

bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Run Code Online (Sandbox Code Playgroud)

结果是,

test-consumer-group  <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh
Run Code Online (Sandbox Code Playgroud)

我能够通过基于 python 的消费者连接到 kafka,该消费者被配置为使用提供group.idtest-consumer-group

首先,我无法理解 kafka 如何/何时创建消费者组。似乎它conf/consumer.properties在某个时间点加载了,另外它在console-consumer-67807通过kafka-console-consumer.sh.

我怎样才能明确地创建我自己的消费者组,比如说my-created-consumer-group

apache-kafka kafka-consumer-api

5
推荐指数
1
解决办法
1万
查看次数

消费者抵消是在消费者组级别还是在该消费者组内的单个消费者进行管理?

我试图找出消费者群体层面是否也存在任何抵消。消费者偏移量是在消费者组级别还是在 Kafka 中该消费者组内的单个消费者?

apache-kafka kafka-consumer-api

5
推荐指数
1
解决办法
958
查看次数