我是 Kafka 的新手,我有一个问题。如果我知道主题、偏移量和分区,我可以从主题中仅删除一条消息吗?如果没有,还有其他选择吗?
我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据。这意味着我需要强制使用特定的 group.id。但是,正如文档中所述,这是不可能的。尽管如此,在 databricks 文档https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 中,它说这是可能的。这是否仅指 azure 集群?
另外,通过查看 apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md的 master 分支的文档,我们可以理解这样的功能旨在在以后的 Spark 版本中添加。你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id 吗?
如果没有,Spark 2.4.0 是否有任何解决方法可以设置特定的消费者 group.id?
apache-kafka apache-spark spark-structured-streaming spark-kafka-integration
我在休息端点调用中启用了幂等性的kafka 生产者(没有启用一次语义或事务)。我启用它的原因是因为我不希望卡夫卡重试导致任何重复。我担心以下几点:
我创建了 3 个 Kafka 经纪人设置,经纪人 ID 为 20、21、22。然后我创建了这个主题:
bin/kafka-topics.sh --zookeeper localhost:2181 \
--create --topic zeta --partitions 4 --replication-factor 3
Run Code Online (Sandbox Code Playgroud)
结果是:
当生产者向主题 zeta 发送消息“hello world”时,Kafka 首先将消息写入哪个分区?
“hello world”消息会在所有 4 个分区中复制吗?
3 个代理中的每个代理都包含所有 4 个分区?这与上述上下文中的复制因子 3 有何关系?
如果我有 8 个在自己的进程或线程中并行运行的消费者订阅了 zeta 主题,Kafka 如何分配分区或代理来并行服务这些消费者?
我是 kafka 的新手,当我阅读 Kafka 文档时,我意识到使用相同密钥提供的消息将被映射到相同的分区以保证顺序。这完全有道理。但是,我想知道如果我们在运行时增加主题分区的数量,具有相同键的新消息是否会像以前一样散列到同一分区(旧分区)?
如果是这样,如果所有消息都提供了键,那么它们都不会映射到新分区怎么办?这对我来说没有意义。
如果不是,那么Kafka如何保证具有相同key的消息的顺序呢?
我执行了
"kafka-consumer-groups --bootstrap-server localhost:9092 --list"
Run Code Online (Sandbox Code Playgroud)
这会导致显示一组:console-consumer-961
然后我尝试删除该组:
kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group console-consumer-961
Run Code Online (Sandbox Code Playgroud)
但这会导致异常:
Error: Deletion of some consumer groups failed:
* Group 'console-consumer-961' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
Run Code Online (Sandbox Code Playgroud)
我猜这个组是在我运行 kafka- 时创建的console-consumer.bat,但现在这个消费者没有运行。如何删除这个消费组?
如果我的 Kafka 主题收到类似的记录
CHANNEL | VIEWERS | .....
ABC | 100 | .....
CBS | 200 | .....
Run Code Online (Sandbox Code Playgroud)
我有 Spark 结构化流代码来读取和处理 Kafka 记录,如下所示:
val spark = SparkSession
.builder
.appName("TestPartition")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val dataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092")
.option("subscribe", "partition_test")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING)")
// I will use a custom UDF to transform to a specific object
Run Code Online (Sandbox Code Playgroud)
目前,我使用 foreachwriter 处理记录如下:
val writer = new ForeachWriter[testRec] {
def open(partitionId: Long, version: Long): Boolean = {
true …Run Code Online (Sandbox Code Playgroud) scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration
我正在尝试了解数据块增量并考虑使用 Kafka 进行 POC。基本上计划是使用来自 Kafka 的数据并将其插入到 databricks delta 表中。
这些是我所做的步骤:
%sql
CREATE TABLE hazriq_delta_trial2 (
value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
Run Code Online (Sandbox Code Playgroud)
%sql
CREATE TABLE hazriq_delta_trial2 (
value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
Run Code Online (Sandbox Code Playgroud)
但是,当我查询表时,它是空的。
我可以确认数据来了。当我向 Kafka 主题生成消息时,我通过查看图中的尖峰来验证它。
我错过了什么吗?
我需要关于如何将从 Kafka 获得的数据插入到表中的帮助。
scala apache-kafka apache-spark spark-structured-streaming delta-lake
我按照此处的快速入门指南的说明在本地运行 kafka ,
然后我定义了我的消费者组配置,config/consumer.properties以便我的消费者可以从定义的group.id
运行以下命令,
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Run Code Online (Sandbox Code Playgroud)
结果是,
test-consumer-group <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh
Run Code Online (Sandbox Code Playgroud)
我能够通过基于 python 的消费者连接到 kafka,该消费者被配置为使用提供group.id即test-consumer-group
首先,我无法理解 kafka 如何/何时创建消费者组。似乎它conf/consumer.properties在某个时间点加载了,另外它在console-consumer-67807通过kafka-console-consumer.sh.
我怎样才能明确地创建我自己的消费者组,比如说my-created-consumer-group?
我试图找出消费者群体层面是否也存在任何抵消。消费者偏移量是在消费者组级别还是在 Kafka 中该消费者组内的单个消费者?