Spark 2.2.0独立模式将数据帧写入本地单节点Kafka时出错

kar*_*rot 3 scala apache-kafka apache-spark apache-spark-sql

数据源来自Databricks Notebook演示:五个Spark SQL Helper实用程序函数,用于提取和探索复杂的数据类型

但是,当我在自己的笔记本电脑上尝试这些代码时,总是会遇到错误。

首先,将JSON数据加载为DataFrame

res2: org.apache.spark.sql.DataFrame = [battery_level: string, c02_level: string]

scala> res2.show
Run Code Online (Sandbox Code Playgroud)
res2: org.apache.spark.sql.DataFrame = [battery_level: string, c02_level: string]

scala> res2.show
Run Code Online (Sandbox Code Playgroud)

其次,write向卡夫卡发送数据:

res2.write 
  .format("kafka") 
  .option("kafka.bootstrap.servers", "localhost:9092") 
  .option("topic", "test") 
  .save()
Run Code Online (Sandbox Code Playgroud)

所有这些都遵循上面的笔记本演示和官方步骤

但是错误显示:

scala> res2.write 
         .format("kafka") 
         .option("kafka.bootstrap.servers", "localhost:9092") 
         .option("topic", "iot-devices") 
         .save()
Run Code Online (Sandbox Code Playgroud)
+-------------+---------+
|battery_level|c02_level|
+-------------+---------+
|            7|      886|
|            5|     1378|
|            8|      917|
|            8|     1504|
|            8|      831|
|            9|     1304|
|            8|     1574|
|            9|     1208|
+-------------+---------+
Run Code Online (Sandbox Code Playgroud)

我假设这可能是Kafka问题,然后我测试了readKafka 的DataFrame 以确保连通性:

scala> val kaDF = spark.read
         .format("kafka") 
         .option("kafka.bootstrap.servers", "localhost:9092") 
         .option("subscribe", "iot-devices") 
         .load()
kaDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]

scala> kaDF.show
Run Code Online (Sandbox Code Playgroud)
res2.write 
  .format("kafka") 
  .option("kafka.bootstrap.servers", "localhost:9092") 
  .option("topic", "test") 
  .save()
Run Code Online (Sandbox Code Playgroud)

因此,结果表明从Kafka bootstrap.servers读取主题“ iot-devices”中的数据localhost:9092确实可行。

我在网上搜索了很多,但仍然无法解决?

任何具有Spark SQL经验的人都可以告诉我命令中的错误吗?

谢谢!

hi-*_*zir 11

该错误消息清楚地显示了问题的根源:

org.apache.spark.sql.AnalysisException:找不到所需的属性“值”;

Dataset要被写入必须具有至少value(和任选的keytopic),并res2具有唯一的battery_levelc02_level

您可以例如:

import org.apache.spark.sql.functions._

res2.select(to_json(struct($"battery_level", "c02_level")).alias("value"))
  .writeStream
  ...
Run Code Online (Sandbox Code Playgroud)