标签: udf

无法使用Spark SQL中的现有Hive永久UDF

我之前已经在hive中注册了一个UDF.永久不是TEMPORARY.它直线工作.

CREATE FUNCTION normaliseURL AS 'com.example.hive.udfs.NormaliseURL' USING JAR 'hdfs://udfs/hive-udfs.jar';
Run Code Online (Sandbox Code Playgroud)

我有火花配置使用蜂巢Metastore.配置正常,因为我可以查询配置单元表.我可以看到UDF;

In [9]: spark.sql('describe function normaliseURL').show(truncate=False)
+-------------------------------------------+
|function_desc                              |
+-------------------------------------------+
|Function: default.normaliseURL             |
|Class: com.example.hive.udfs.NormaliseURL  |
|Usage: N/A.                                |
+-------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

但是我不能在sql语句中使用UDF;

spark.sql('SELECT normaliseURL("value")')
AnalysisException: "Undefined function: 'default.normaliseURL'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7"
Run Code Online (Sandbox Code Playgroud)

如果我尝试使用spark注册UDF(绕过Metastore),则无法注册它,表明它已经存在.

In [12]: spark.sql("create function normaliseURL as 'com.example.hive.udfs.NormaliseURL'")
AnalysisException: "Function 'default.normaliseURL' already exists in database 'default';"
Run Code Online (Sandbox Code Playgroud)

我正在使用Spark 2.0,hive metastore 1.1.0.UDF是scala,我的spark驱动程序代码是python.

我很难过. …

hive apache-spark apache-spark-sql udf

24
推荐指数
1
解决办法
8429
查看次数

Spark UDF与varargs

如文档中所示,它是列出最多22个所有参数的唯一选项吗?

https://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.UDFRegistration

有人想出如何做类似的事情吗?

sc.udf.register("func", (s: String*) => s......
Run Code Online (Sandbox Code Playgroud)

(编写跳过空值的自定义concat函数,当时只有2个参数)

谢谢

scala apache-spark udf

18
推荐指数
1
解决办法
1万
查看次数

StructType/Row的Spark UDF

我在spark Dataframe中有一个"StructType"列,它有一个数组和一个字符串作为子字段.我想修改数组并返回相同类型的新列.我可以用UDF处理它吗?或者有哪些替代方案?

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil)
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil)
val data = Seq(Row(Row(Array(1,2),"eb")),  Row(Row(Array(3,2,1), "dsf")) )
val rd = sc.parallelize(data)
val df = spark.createDataFrame(rd, schema)
df.printSchema

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

看来我需要一个类型为Row的UDF

val u =  udf((x:Row) => x)
       >> Schema for type org.apache.spark.sql.Row is not supported …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark udf

17
推荐指数
3
解决办法
2万
查看次数

Spark SQL嵌套withColumn

我有一个DataFrame,它有多个列,其中一些是结构.像这样的东西

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)
 |-- abc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- def: struct (nullable = true)
 |    |    |    |-- a: string (nullable = true)
 |    |    |    |-- b: integer (nullable = true)
 |    |    |    |-- c: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我想UserDefinedFunction在列上应用一个baz替换baz功能baz,但我无法弄清楚如何做到这一点.这是一个所需输出的例子(注意baz …

scala dataframe apache-spark udf

15
推荐指数
1
解决办法
5824
查看次数

如何让sklearn K最近邻居采取自定义距离指标?

我有一个我需要使用的自定义距离指标KNN,K Nearest Neighbors.

我试着遵循这个,但我不能因为某些原因让它工作.

我假设距离度量应该采用两个相同长度的向量/数组,如下所述:

import sklearn 
from sklearn.neighbors import NearestNeighbors
import numpy as np
import pandas as pd

def d(a,b,L):
    # Inputs: a and b are rows from a data matrix   
    return a+b+2+L

knn=NearestNeighbors(n_neighbors=1,
                 algorithm='auto',
                 metric='pyfunc',
                 func=lambda a,b: d(a,b,L)
                 )


X=pd.DataFrame({'b':[0,3,2],'c':[1.0,4.3,2.2]})
knn.fit(X)
Run Code Online (Sandbox Code Playgroud)

但是,当我调用:时knn.kneighbors(),它似乎不喜欢自定义函数.这是错误堆栈的底部:

ValueError: Unknown metric pyfunc. Valid metrics are ['euclidean', 'l2', 'l1', 'manhattan', 'cityblock', 'braycurtis', 'canberra', 'chebyshev', 'correlation', 'cosine', 'dice', 'hamming', 'jaccard', 'kulsinski', 'mahalanobis', 'matching', 'minkowski', 'rogerstanimoto', 'russellrao', 'seuclidean', 'sokalmichener', …
Run Code Online (Sandbox Code Playgroud)

distance nearest-neighbor scikit-learn udf

14
推荐指数
1
解决办法
5011
查看次数

Hive UDF用于选择除某些列之外的所有列

HiveQL(以及一般的SQL)中的常见查询构建模式是选择所有列(SELECT *)或显式指定的列(SELECT A, B, C).SQL没有内置机制来选择除指定的列集之外的所有列.

本SO问题所述,有各种排除某些列的机制,但没有一种适用于HiveQL.(例如,创建一个临时表,SELECT *然后ALTER TABLE DROP使用其某些列的想法会在大数据环境中造成严重破坏.)

忽略关于选择除了一些列以外的所有列是否是一个好主意的意识形态讨论,这个问题是关于用这种能力扩展Hive的可能方法.

在Hive 0.13.0之前,SELECT可以采用基于正则表达式的列,例如,property_.*在反引号引用的字符串中.@ invoketheshell下面的答案指的是这个功能,但它是有代价的,即当启用此功能时,Hive不能接受其中包含非标准字符的列,例如,$foox/y.这就是Hive开发人员默认情况下在0.13.0中关闭此行为的原因.我正在寻找适用于任何列名称的通用解决方案.

生成通用表的UDF(UDTF)当然可以这样做,因为它可以操作模式.由于我们不打算生成新行,有没有办法使用简单的基于行的UDF解决这个问题?

这似乎是Web上许多帖子的常见问题,显示如何为各种数据库解决它,但我还没有找到Hive的解决方案.是否有代码可以执行此操作?

hive hiveql apache-spark apache-spark-sql udf

10
推荐指数
1
解决办法
4475
查看次数

使用Hive表的Hive UDF

我已经在java中开发了一个正常工作的hive udf,我的函数返回输入和hive表中的列之间的最佳匹配,所以它有这个简化的伪代码:

class myudf  extends udf{

    evaluate(Text input){

        getNewHiveConnection(); //i want to replace this by getCurrentHiveUserConnetion();
        executeHiveQuery(input);
        return something;
}
Run Code Online (Sandbox Code Playgroud)

我的问题是,如果这个函数是由Hive调用的,为什么我需要在我的代码中连接到hive?我可以使用当前连接使用我的功能的用户连接吗?

java hive user-defined-functions udf

9
推荐指数
1
解决办法
1531
查看次数

Spark SQL分组:如果你不关心你获得哪个值,可以添加到group by或wrap in first().

我在Spark SQL中有一个查询

select count(ts), truncToHour(ts)
from myTable
group by truncToHour(ts).
Run Code Online (Sandbox Code Playgroud)

哪里ts是时间戳类型,truncToHour是舍去时间戳小时UDF.此查询不起作用.如果我试试,

select count(ts), ts from myTable group by truncToHour(ts)
Run Code Online (Sandbox Code Playgroud)

我有expression 'ts' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.;,但first()如果我这样做,则没有定义:

select count(ts), first(ts) from myTable group by truncToHour(ts)
Run Code Online (Sandbox Code Playgroud)

无论如何在不使用子查询的情况下得到我想要的东西?另外,为什么它说"包装在first()"但是first()没有定义?

sql group-by apache-spark udf

8
推荐指数
1
解决办法
2万
查看次数

地图和udf之间的区别

当我在Spark中使用DataFrame时,我有时只需要编辑该DataFrame中特定列的值.例如.如果count我的数据框中有一个字段,如果我想添加1每个值count,那么我可以编写一个自定义withColumn的udf 来使用DataFrames 的功能完成工作,或者我可以map在DataFrame上做一个然后从结果RDD中提取另一个DataFrame.

我想知道的是udf实际上是如何工作的.在这种情况下,使用map/udf给我一个比较.性能差异是什么?

谢谢!

scala apache-spark udf

8
推荐指数
1
解决办法
2456
查看次数

BigQuery用户定义的聚合功能?

我知道我可以定义用户定义函数以执行一些自定义计算.我也知道我可以使用"开箱即用" 聚合函数在使用GROUP BY子句时将值集合减少为单个值.

是否可以定义用于GROUP BY子句的自定义用户定义的聚合函数?

aggregate-functions google-bigquery udf

8
推荐指数
1
解决办法
1163
查看次数