将一种类型的 Spark scala 数据集转换为另一种类型

Nik*_*ole 0 scala apache-spark apache-spark-sql scala-spark

我有一个具有以下案例类类型的数据集:

  case class AddressRawData(
                         addressId: String,
                         customerId: String,
                         address: String
                       )
Run Code Online (Sandbox Code Playgroud)

我想将其转换为:

case class AddressData(
                          addressId: String,
                          customerId: String,
                          address: String,
                          number: Option[Int], //i.e. it is optional
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )
Run Code Online (Sandbox Code Playgroud)

使用解析器函数:

  def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")
      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }
Run Code Online (Sandbox Code Playgroud)

我是 Scala 和 Spark 的新手。谁能告诉我如何做到这一点?

Koe*_*dlt 5

你走在正确的道路上!当然,有多种方法可以做到这一点。但是,由于您已经开始创建一些案例类,并且您已经开始创建解析函数,因此一个优雅的解决方案是使用数据集的函数map。从文档来看,该map函数签名如下:

\n
def map[U](func: (T) \xe2\x87\x92 U)(implicit arg0: Encoder[U]): Dataset[U] \n
Run Code Online (Sandbox Code Playgroud)\n

T起始类型(AddressRawData在您的情况下)在哪里,并且U您想要到达的类型(AddressData在您的情况下)。所以这个函数的输入是将 a 转换为 a 的map函数。这很可能就是您已经开始制作的!AddressRawDataAddressDataaddressParser

\n

现在,您的电流addressParser具有以下签名:

\n
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData]\n
Run Code Online (Sandbox Code Playgroud)\n

为了能够将其提供给该map函数,我们需要进行以下签名:

\n
def newAddressParser(unparsedAddress: AddressRawData): AddressData\n
Run Code Online (Sandbox Code Playgroud)\n

知道了这一切,我们就可以进一步工作了!一个例子如下:

\n
import spark.implicits._\nimport scala.util.Try\n\n// Your case classes\ncase class AddressRawData(addressId: String, customerId: String, address: String)\ncase class AddressData(\n  addressId: String,\n  customerId: String,\n  address: String,\n  number: Option[Int],\n  road: Option[String],\n  city: Option[String],\n  country: Option[String]\n)\n\n// Your addressParser function, adapted to be able to feed into the Dataset.map\n// function\ndef addressParser(rawAddress: AddressRawData): AddressData = {\n  val addressArray = rawAddress.address.split(", ")\n  AddressData(\n    rawAddress.addressId,\n    rawAddress.customerId,\n    rawAddress.address,\n    Try(addressArray(0).toInt).toOption,\n    Try(addressArray(1)).toOption,\n    Try(addressArray(2)).toOption,\n    Try(addressArray(3)).toOption\n  )\n}\n\n// Creating a sample dataset\nval rawDS = Seq(\n  AddressRawData("1", "1", "20, my super road, beautifulCity, someCountry"),\n  AddressRawData("1", "1", "badFormat, some road, cityButNoCountry")\n).toDS\n\nval parsedDS = rawDS.map(addressParser)\n\nparsedDS.show                                                                                                                                                                                                                                                            \n+---------+----------+--------------------+------+-------------+----------------+-----------+                                                                                                                                                                                   \n|addressId|customerId|             address|number|         road|            city|    country|                                                                                                                                                                                   \n+---------+----------+--------------------+------+-------------+----------------+-----------+                                                                                                                                                                                   \n|        1|         1|20, my super road...|    20|my super road|   beautifulCity|someCountry|                                                                                                                                                                                   \n|        1|         1|badFormat, some r...|  null|    some road|cityButNoCountry|       null|                                                                                                                                                                                   \n+---------+----------+--------------------+------+-------------+----------------+-----------+\n
Run Code Online (Sandbox Code Playgroud)\n

如您所见,由于您已经预见到解析可能会出错,因此可以轻松地尝试scala.util.Try获取该原始地址的各个部分并在其中添加一些稳健性(第二行包含一些null值,其中无法解析address字符串。

\n

希望这可以帮助!

\n