小编Chi*_*ti5的帖子

Pyspark Dataframe将函数应用于两列

假设我有两个PySpark DataFrames df1df2.

df1=   'a' 
        1    
        2    
        5    

df2=   'b'
        3
        6
Run Code Online (Sandbox Code Playgroud)

我想df2['b']为每个值找到最接近的值df1['a'],并将最接近的值添加为新列df1.

换句话说,每个值xdf1['a'],我想找到一个y即实现min(abx(x-y))对所有y in df2['b'](注:可以假设,仅仅是有一个y能够实现的最小距离),其结果将是

'a'    'b'
 1      3
 2      3
 5      6
Run Code Online (Sandbox Code Playgroud)

我尝试使用以下代码首先创建距离矩阵(在找到达到最小距离的值之前):

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def dict(x,y):
    return abs(x-y)
udf_dict = udf(dict, IntegerType())

sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
Run Code Online (Sandbox Code Playgroud)

这使

Column<PythonUDF#dist(a,b)>
Run Code Online (Sandbox Code Playgroud)

然后我试了一下

sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
Run Code Online (Sandbox Code Playgroud)

它会永远运行而不会产生错误/输出.

我的问题是:

  1. 由于我是Spark的新手,我构建输出DataFrame的方法是否有效?(我的方法是首先为所有ab值创建一个距离矩阵,然后找到min …

pyspark spark-dataframe pyspark-sql

7
推荐指数
1
解决办法
2万
查看次数

标签 统计

pyspark ×1

pyspark-sql ×1

spark-dataframe ×1