如何使用from_json和schema作为字符串(即JSON编码的模式)?

Man*_*esh 5 apache-spark apache-spark-sql spark-structured-streaming

我正在从Kafka读取一个流,并将Kafka(即JSON)中的值转换为Structure.

from_json有一个采用类型模式的变体String,但我找不到样本.请告知以下代码中的错误.

错误

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',

== SQL ==
STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING )  )  ) 
-------^^^

at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
Run Code Online (Sandbox Code Playgroud)

程序

public static void main(String[] args) throws AnalysisException {
    String master = "local[*]";
    String brokers = "quickstart:9092";
    String topics = "simple_topic_6";

    SparkSession sparkSession = SparkSession
            .builder().appName(EmployeeSchemaLoader.class.getName())
            .master(master).getOrCreate();

   String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
            "addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING )  )  ) ";

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();

    Dataset<Row> employeeDataset = sparkSession.readStream().
            format("kafka").
            option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topics).load();
    employeeDataset.printSchema();
    employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
    employeeDataset = employeeDataset.withColumn("employeeRecord",
            functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));

    employeeDataset.printSchema();
    employeeDataset.createOrReplaceTempView("employeeView");

    sparkSession.catalog().listTables().show();

    sqlCtx.sql("select * from employeeView").show();
}
Run Code Online (Sandbox Code Playgroud)

Jac*_*ski 7

您的问题帮助我发现基于from_jsonwith String的架构的变体仅在Java中可用,并且最近在即将推出的2.3.0中已添加到Spark API for Scala.我坚持认为Spark API for Scala总是功能最丰富,我的问题帮助我学习它不应该在2.3.0(!)中更改之前就已经存在了很长时间.

回到您的问题,您可以实际以JSON或DDL格式定义基于字符串的架构.

手工编写JSON可能有点麻烦,所以我采取不同的方法(因为我是一个Scala开发人员相当容易).

我们首先使用Spark API for Scala定义模式.

import org.apache.spark.sql.types._
val addressesSchema = new StructType()
  .add($"city".string)
  .add($"state".string)
  .add($"zip".string)
val schema = new StructType()
  .add($"firstName".string)
  .add($"lastName".string)
  .add($"email".string)
  .add($"addresses".array(addressesSchema))
scala> schema.printTreeString
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- zip: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

这似乎与您的架构相匹配,不是吗?

通过该方法,将模式转换为JSON编码的字符串是一件轻而易举的json方法.

val schemaAsJson = schema.json
Run Code Online (Sandbox Code Playgroud)

schemaAsJson正是你的JSON字符串,看起来很漂亮...嗯...复杂.出于显示目的,我宁愿使用prettyJson方法.

scala> println(schema.prettyJson)
{
  "type" : "struct",
  "fields" : [ {
    "name" : "firstName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "lastName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "email",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "addresses",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "city",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "state",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "zip",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  } ]
}
Run Code Online (Sandbox Code Playgroud)

这是你在JSON中的架构.

您可以使用DataType并"验证"JSON字符串(使用Spark在封面下使用的DataType.fromJsonfrom_json).

import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)
scala> println(dt.sql)
STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>
Run Code Online (Sandbox Code Playgroud)

一切似乎都很好.介意我是否用样本数据集检查了这个?

val rawJsons = Seq("""
  {
    "firstName" : "Jacek",
    "lastName" : "Laskowski",
    "email" : "jacek@japila.pl",
    "addresses" : [
      {
        "city" : "Warsaw",
        "state" : "N/A",
        "zip" : "02-791"
      }
    ]
  }
""").toDF("rawjson")
val people = rawJsons
  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
  .select("json.*") // <-- flatten the struct field
  .withColumn("address", explode($"addresses")) // <-- explode the array field
  .drop("addresses")  // <-- no longer needed
  .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName|          email|  city|state|   zip|
+---------+---------+---------------+------+-----+------+
|    Jacek|Laskowski|jacek@japila.pl|Warsaw|  N/A|02-791|
+---------+---------+---------------+------+-----+------+
Run Code Online (Sandbox Code Playgroud)