Gya*_*yan 4 scala apache-spark apache-spark-sql jsonparser
我正在使用Spark 2.1.1(kafka 0.10+)阅读Kafka主题,有效负载是JSON字符串.我想用模式解析字符串并继续使用业务逻辑.
每个人似乎都建议我应该使用from_json解析JSON字符串,但是,它似乎不适合我的情况编译.错误是
not found : value from_json
.select(from_json($"json", txnSchema) as "data")
Run Code Online (Sandbox Code Playgroud)
当我尝试将以下几行放入火花壳时,它的效果很好 -
val df = stream
.select($"value" cast "string" as "json")
.select(from_json($"json", txnSchema) as "data")
.select("data.*")
Run Code Online (Sandbox Code Playgroud)
任何想法,我在代码中可能做错了,看到这个部分在shell中运行但在IDE /编译时没有?
这是代码:
import org.apache.spark.sql._
object Kafka10Cons3 extends App {
val spark = SparkSession
.builder
.appName(Util.getProperty("AppName"))
.master(Util.getProperty("spark.master"))
.getOrCreate
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load
val txnSchema = Util.getTxnStructure
val df = stream
.select($"value" cast "string" as "json")
.select(from_json($"json", txnSchema) as "data")
.select("data.*")
}
Run Code Online (Sandbox Code Playgroud)
Tza*_*har 10
你可能只是错过了相关的导入 - import org.apache.spark.sql.functions._.
您已导入spark.implicits._和org.apache.spark.sql._,但这些都不会导入单个函数functions.
我也是导入
com.wizzardo.tools.json,看起来它也有一个from_json函数,它必须是编译器选择的那个(因为它是先导入的?),这显然与我的spark版本不兼容
确保您没有from_json从其他json库导入该函数,因为此库可能与您正在使用的spark版本不兼容.
| 归档时间: |
|
| 查看次数: |
3619 次 |
| 最近记录: |