Haz*_*hak 5 scala apache-kafka apache-spark spark-structured-streaming delta-lake
我正在尝试了解数据块增量并考虑使用 Kafka 进行 POC。基本上计划是使用来自 Kafka 的数据并将其插入到 databricks delta 表中。
这些是我所做的步骤:
%sql
CREATE TABLE hazriq_delta_trial2 (
value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
Run Code Online (Sandbox Code Playgroud)
%sql
CREATE TABLE hazriq_delta_trial2 (
value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
Run Code Online (Sandbox Code Playgroud)
但是,当我查询表时,它是空的。
我可以确认数据来了。当我向 Kafka 主题生成消息时,我通过查看图中的尖峰来验证它。
我错过了什么吗?
我需要关于如何将从 Kafka 获得的数据插入到表中的帮助。
1) 尝试验证您的 Spark 集群是否可以访问 Kafka,有时您需要允许 Kafka 中的某些 ip 访问。
2)尝试将其更改.option("startingOffsets", "earliest"为此.option("startingOffsets", "latest")
3)也尝试一下
val kafka2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.load()
.select($"value")
.withColumn("Value", $"value".cast(StringType))
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
.start("hazriq_delta_trial2")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1932 次 |
| 最近记录: |