kar*_*rot 3 scala apache-kafka apache-spark apache-spark-sql
数据源来自Databricks Notebook演示:五个Spark SQL Helper实用程序函数,用于提取和探索复杂的数据类型!
但是,当我在自己的笔记本电脑上尝试这些代码时,总是会遇到错误。
首先,将JSON数据加载为DataFrame
res2: org.apache.spark.sql.DataFrame = [battery_level: string, c02_level: string]
scala> res2.show
Run Code Online (Sandbox Code Playgroud)
res2: org.apache.spark.sql.DataFrame = [battery_level: string, c02_level: string]
scala> res2.show
Run Code Online (Sandbox Code Playgroud)
其次,write向卡夫卡发送数据:
res2.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()
Run Code Online (Sandbox Code Playgroud)
所有这些都遵循上面的笔记本演示和官方步骤
但是错误显示:
scala> res2.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "iot-devices")
.save()
Run Code Online (Sandbox Code Playgroud)
+-------------+---------+
|battery_level|c02_level|
+-------------+---------+
| 7| 886|
| 5| 1378|
| 8| 917|
| 8| 1504|
| 8| 831|
| 9| 1304|
| 8| 1574|
| 9| 1208|
+-------------+---------+
Run Code Online (Sandbox Code Playgroud)
我假设这可能是Kafka问题,然后我测试了readKafka 的DataFrame 以确保连通性:
scala> val kaDF = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "iot-devices")
.load()
kaDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
scala> kaDF.show
Run Code Online (Sandbox Code Playgroud)
res2.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()
Run Code Online (Sandbox Code Playgroud)
因此,结果表明从Kafka bootstrap.servers读取主题“ iot-devices”中的数据localhost:9092确实可行。
我在网上搜索了很多,但仍然无法解决?
任何具有Spark SQL经验的人都可以告诉我命令中的错误吗?
谢谢!
hi-*_*zir 11
该错误消息清楚地显示了问题的根源:
org.apache.spark.sql.AnalysisException:找不到所需的属性“值”;
的Dataset要被写入必须具有至少value柱(和任选的key和topic),并res2具有唯一的battery_level,c02_level。
您可以例如:
import org.apache.spark.sql.functions._
res2.select(to_json(struct($"battery_level", "c02_level")).alias("value"))
.writeStream
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2497 次 |
| 最近记录: |