相关疑难解决方法(0)

定义一个接受Spark DataFrame中的对象数组的UDF?

使用Spark的DataFrame时,需要使用用户定义函数(UDF)来映射列中的数据.UDF要求显式指定参数类型.在我的情况下,我需要操作由对象数组组成的列,我不知道要使用什么类型.这是一个例子:

import sqlContext.implicits._

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
  """
  |{
  |  "topic" : "pets",
  |  "subjects" : [
  |    {"type" : "cat", "score" : 10},
  |    {"type" : "dog", "score" : 1}
  |  ]
  |}
  """)))
Run Code Online (Sandbox Code Playgroud)

使用内置org.apache.spark.sql.functions函数对列中的数据执行基本操作相对简单

import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show

+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets|             2|
+-----+--------------+
Run Code Online (Sandbox Code Playgroud)

并且通常很容易编写自定义UDF来执行任意操作

import org.apache.spark.sql.functions.udf
val enhance = udf { topic : …
Run Code Online (Sandbox Code Playgroud)

scala user-defined-functions dataframe apache-spark apache-spark-sql

26
推荐指数
1
解决办法
1万
查看次数

在SQLContext之外用Java创建SparkSQL UDF

问题

我想在Java中创建一个用户定义的函数,可以在Apache Spark运算符链中作为Java方法调用.我在查找不需要在SQL查询中存在UDF的Java示例时遇到了麻烦.

版本

  • Java 8
  • 斯卡拉2.10.6
  • Apache Spark 1.6.0预装为Hadoop 2.6.0

我尝试过的是什么

我可以用Java成功创建UDF.但是,我不能使用它,除非它在SQL查询中:

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
oldDF.registerTempTable("df");
DataFrame newDF = sqlContext.sql("SELECT udfUppercase(name) AS name_upper FROM df");
Run Code Online (Sandbox Code Playgroud)

我被困在哪里

我希望Java中的非SQL方法调用样式的UDF看起来像这样:

import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

UserDefinedFunction udfUppercase = udf(
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", …
Run Code Online (Sandbox Code Playgroud)

java user-defined-functions dataframe apache-spark apache-spark-sql

3
推荐指数
1
解决办法
7667
查看次数