如何使用 Spark Structured Streaming 将数据从 Kafka 主题流式传输到 Delta 表

Haz*_*hak 5 scala apache-kafka apache-spark spark-structured-streaming delta-lake

我正在尝试了解数据块增量并考虑使用 Kafka 进行 POC。基本上计划是使用来自 Kafka 的数据并将其插入到 databricks delta 表中。

这些是我所做的步骤:

  1. 在数据块上创建增量表。
%sql
CREATE TABLE hazriq_delta_trial2 (
  value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
Run Code Online (Sandbox Code Playgroud)
  1. 消费来自 Kafka 的数据。
%sql
CREATE TABLE hazriq_delta_trial2 (
  value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
Run Code Online (Sandbox Code Playgroud)

但是,当我查询表时,它是空的。

我可以确认数据来了。当我向 Kafka 主题生成消息时,我通过查看图中的尖峰来验证它。

传入数据

我错过了什么吗?

我需要关于如何将从 Kafka 获得的数据插入到表中的帮助。

Eri*_*let 0

1) 尝试验证您的 Spark 集群是否可以访问 Kafka,有时您需要允许 Kafka 中的某些 ip 访问。

2)尝试将其更改.option("startingOffsets", "earliest"为此.option("startingOffsets", "latest")

3)也尝试一下

val kafka2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "earliest")
  .load()
  .select($"value")
  .withColumn("Value", $"value".cast(StringType))
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
  .start("hazriq_delta_trial2")
Run Code Online (Sandbox Code Playgroud)