Nik*_*ole 0 scala apache-spark apache-spark-sql scala-spark
我有一个具有以下案例类类型的数据集:
case class AddressRawData(
addressId: String,
customerId: String,
address: String
)
Run Code Online (Sandbox Code Playgroud)
我想将其转换为:
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int], //i.e. it is optional
road: Option[String],
city: Option[String],
country: Option[String]
)
Run Code Online (Sandbox Code Playgroud)
使用解析器函数:
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
unparsedAddress.map(address => {
val split = address.address.split(", ")
address.copy(
number = Some(split(0).toInt),
road = Some(split(1)),
city = Some(split(2)),
country = Some(split(3))
)
}
)
}
Run Code Online (Sandbox Code Playgroud)
我是 Scala 和 Spark 的新手。谁能告诉我如何做到这一点?
你走在正确的道路上!当然,有多种方法可以做到这一点。但是,由于您已经开始创建一些案例类,并且您已经开始创建解析函数,因此一个优雅的解决方案是使用数据集的函数map。从文档来看,该map函数签名如下:
def map[U](func: (T) \xe2\x87\x92 U)(implicit arg0: Encoder[U]): Dataset[U] \nRun Code Online (Sandbox Code Playgroud)\nT起始类型(AddressRawData在您的情况下)在哪里,并且U您想要到达的类型(AddressData在您的情况下)。所以这个函数的输入是将 a 转换为 a 的map函数。这很可能就是您已经开始制作的!AddressRawDataAddressDataaddressParser
现在,您的电流addressParser具有以下签名:
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData]\nRun Code Online (Sandbox Code Playgroud)\n为了能够将其提供给该map函数,我们需要进行以下签名:
def newAddressParser(unparsedAddress: AddressRawData): AddressData\nRun Code Online (Sandbox Code Playgroud)\n知道了这一切,我们就可以进一步工作了!一个例子如下:
\nimport spark.implicits._\nimport scala.util.Try\n\n// Your case classes\ncase class AddressRawData(addressId: String, customerId: String, address: String)\ncase class AddressData(\n addressId: String,\n customerId: String,\n address: String,\n number: Option[Int],\n road: Option[String],\n city: Option[String],\n country: Option[String]\n)\n\n// Your addressParser function, adapted to be able to feed into the Dataset.map\n// function\ndef addressParser(rawAddress: AddressRawData): AddressData = {\n val addressArray = rawAddress.address.split(", ")\n AddressData(\n rawAddress.addressId,\n rawAddress.customerId,\n rawAddress.address,\n Try(addressArray(0).toInt).toOption,\n Try(addressArray(1)).toOption,\n Try(addressArray(2)).toOption,\n Try(addressArray(3)).toOption\n )\n}\n\n// Creating a sample dataset\nval rawDS = Seq(\n AddressRawData("1", "1", "20, my super road, beautifulCity, someCountry"),\n AddressRawData("1", "1", "badFormat, some road, cityButNoCountry")\n).toDS\n\nval parsedDS = rawDS.map(addressParser)\n\nparsedDS.show \n+---------+----------+--------------------+------+-------------+----------------+-----------+ \n|addressId|customerId| address|number| road| city| country| \n+---------+----------+--------------------+------+-------------+----------------+-----------+ \n| 1| 1|20, my super road...| 20|my super road| beautifulCity|someCountry| \n| 1| 1|badFormat, some r...| null| some road|cityButNoCountry| null| \n+---------+----------+--------------------+------+-------------+----------------+-----------+\nRun Code Online (Sandbox Code Playgroud)\n如您所见,由于您已经预见到解析可能会出错,因此可以轻松地尝试scala.util.Try获取该原始地址的各个部分并在其中添加一些稳健性(第二行包含一些null值,其中无法解析address字符串。
希望这可以帮助!
\n