jsontostructs to spark in spark结构化流媒体

Mar*_*iak 6 java apache-spark apache-spark-sql apache-spark-2.0 spark-structured-streaming

我正在使用Spark 2.2,我正在尝试从Kafka读取JSON消息,将它们转换为DataFrame并将它们作为Row:

spark
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topic")
    .load()
    .select(col("value").cast(StringType).as("col"))
    .writeStream()
    .format("console")
    .start();
Run Code Online (Sandbox Code Playgroud)

有了这个,我可以实现:

+--------------------+
|                 col|
+--------------------+
|{"myField":"somet...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)

我想要更像这样的东西:

+--------------------+
|             myField|
+--------------------+
|"something"         |
+--------------------+
Run Code Online (Sandbox Code Playgroud)

我尝试使用from_json函数struct:

DataTypes.createStructType(
    new StructField[] {
            DataTypes.createStructField("myField", DataTypes.StringType)
    }
)
Run Code Online (Sandbox Code Playgroud)

但我只得到:

+--------------------+
|  jsontostructs(col)|
+--------------------+
|[something]         |
+--------------------+
Run Code Online (Sandbox Code Playgroud)

然后我尝试使用,explode但我只有Exception说:

cannot resolve 'explode(`col`)' due to data type mismatch: 
input to function explode should be array or map type, not 
StructType(StructField(...
Run Code Online (Sandbox Code Playgroud)

知道如何使这项工作?

use*_*411 8

你几乎就在那里,只需选择正确的东西.from_json返回struct与架构匹配的列.如果schema(JSON表示)如下所示:

{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]}
Run Code Online (Sandbox Code Playgroud)

你会得到相当于的嵌套对象:

root
 |-- jsontostructs(col): struct (nullable = true)
 |    |-- myField: string (nullable = false)
Run Code Online (Sandbox Code Playgroud)

您可以使用getField(或getItem)方法选择特定字段

df.select(from_json(col("col"), schema).getField("myField").alias("myField"));
Run Code Online (Sandbox Code Playgroud)

或者.*选择以下所有顶级字段struct:

df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*");
Run Code Online (Sandbox Code Playgroud)

虽然对于单列string,get_json_object应该绰绰有余:

df.select(get_json_object(col("col"), "$.myField"));
Run Code Online (Sandbox Code Playgroud)