为什么from_json失败了"找不到:值from_json"?

Gya*_*yan 4 scala apache-spark apache-spark-sql jsonparser

我正在使用Spark 2.1.1(kafka 0.10+)阅读Kafka主题,有效负载是JSON字符串.我想用模式解析字符串并继续使用业务逻辑.

每个人似乎都建议我应该使用from_json解析JSON字符串,但是,它似乎不适合我的情况编译.错误是

not found : value from_json 
.select(from_json($"json", txnSchema) as "data")
Run Code Online (Sandbox Code Playgroud)

当我尝试将以下几行放入火花壳时,它的效果很好 -

val df = stream
  .select($"value" cast "string" as "json")
  .select(from_json($"json", txnSchema) as "data")
  .select("data.*")
Run Code Online (Sandbox Code Playgroud)

任何想法,我在代码中可能做错了,看到这个部分在shell中运行但在IDE /编译时没有?

这是代码:

import org.apache.spark.sql._

object Kafka10Cons3 extends App {
  val spark = SparkSession
    .builder
    .appName(Util.getProperty("AppName"))
    .master(Util.getProperty("spark.master"))
    .getOrCreate

  val stream = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
    .option("subscribe", src_topic)
    .load

  val txnSchema = Util.getTxnStructure
  val df = stream
    .select($"value" cast "string" as "json")
    .select(from_json($"json", txnSchema) as "data")
    .select("data.*")
}
Run Code Online (Sandbox Code Playgroud)

Tza*_*har 10

你可能只是错过了相关的导入 - import org.apache.spark.sql.functions._.

您已导入spark.implicits._org.apache.spark.sql._,但这些都不会导入单个函数functions.


我也是导入com.wizzardo.tools.json,看起来它也有一个from_json函数,它必须是编译器选择的那个(因为它是先导入的?),这显然与我的spark版本不兼容

确保您没有from_json从其他json库导入该函数,因为此库可能与您正在使用的spark版本不兼容.