nes*_*est 3 scala apache-spark apache-spark-sql
我想创建一个数组数组.这是我的数据表:
// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)
// Create an RDD with some data
val x = sc.parallelize(Array(
Testing(null, 21, 905),
Testing("Noelia", 26, 1130),
Testing("Pilar", 52, 1890),
Testing("Roberto", 31, 1450)
))
// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)
// For SQL usage we need to register the table
df.registerTempTable("df")
Run Code Online (Sandbox Code Playgroud)
我想创建一个整数列"age"的数组.为此,我使用"collect_list":
sqlContext.sql("SELECT collect_list(age) as age from df").show
Run Code Online (Sandbox Code Playgroud)
但现在我想生成一个包含多个数组的数组,如上所示:
sqlContext.sql("SELECT collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show
Run Code Online (Sandbox Code Playgroud)
但这不起作用,或使用函数org.apache.spark.sql.functions.array.有任何想法吗?
eli*_*sah 10
好吧,事情变得更加简单.让我们考虑您正在处理的相同数据,并从那里一步一步地进行
// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)
// Create an RDD with some data
val x = sc.parallelize(Array(
Testing(null, 21, 905),
Testing("Noelia", 26, 1130),
Testing("Pilar", 52, 1890),
Testing("Roberto", 31, 1450)
))
// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)
// For SQL usage we need to register the table
df.registerTempTable("df")
sqlContext.sql("select collect_list(age) as age from df").show
// +----------------+
// | age|
// +----------------+
// |[21, 26, 52, 31]|
// +----------------+
sqlContext.sql("select collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show
Run Code Online (Sandbox Code Playgroud)
正如错误消息所示:
org.apache.spark.sql.AnalysisException: No handler for Hive udf class
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Exactly one argument is expected..; line 1 pos 52 [...]
Run Code Online (Sandbox Code Playgroud)
collest_list只需要一个参数.我们来这里查看文档.
它实际上需要一个参数!但是,让我们进一步了解函数对象的文档.您似乎注意到数组函数允许您从Column或重复的Column参数创建新的数组列.所以让我们用它:
sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df").show(false)
Run Code Online (Sandbox Code Playgroud)
数组函数确实创建了一个列列表,列中列出了年龄和薪水上的collect_list:
// +-------------------------------------------------------------------+
// |arrayInt |
// +-------------------------------------------------------------------+
// |[WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450)]|
// +-------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
我们从哪里去?
您必须记住,DataFrame中的Row只是由Row包装的另一个集合.
我要做的第一件事是研究那个系列.那么我们如何压扁WrappedArray[WrappedArray[Int]]?
Scala是你需要使用的一种神奇的东西 .flatten
import scala.collection.mutable.WrappedArray
val firstRow: mutable.WrappedArray[mutable.WrappedArray[Int]] =
sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df")
.first.get(0).asInstanceOf[WrappedArray[WrappedArray[Int]]]
// res26: scala.collection.mutable.WrappedArray[scala.collection.mutable.WrappedArray[Int]] =
// WrappedArray(WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450))
firstRow.flatten
// res27: scala.collection.mutable.IndexedSeq[Int] = ArrayBuffer(21, 26, 52, 31, 905, 1130, 1890, 1450)
Run Code Online (Sandbox Code Playgroud)
现在让我们将它包装在UDF中,以便我们可以在DataFrame上使用它:
def flatten(array: WrappedArray[WrappedArray[Int]]) = array.flatten
sqlContext.udf.register("flatten", flatten(_: WrappedArray[WrappedArray[Int]]))
Run Code Online (Sandbox Code Playgroud)
由于我们注册了UDF,现在我们可以在sqlContext中使用它:
sqlContext.sql("select flatten(array(collect_list(age), collect_list(salary))) as arrayInt from df").show(false)
// +---------------------------------------+
// |arrayInt |
// +---------------------------------------+
// |[21, 26, 52, 31, 905, 1130, 1890, 1450]|
// +---------------------------------------+
Run Code Online (Sandbox Code Playgroud)
我希望这有帮助 !
| 归档时间: |
|
| 查看次数: |
6062 次 |
| 最近记录: |