假设我有两个PySpark DataFrames df1和df2.
df1= 'a'
1
2
5
df2= 'b'
3
6
Run Code Online (Sandbox Code Playgroud)
我想df2['b']为每个值找到最接近的值df1['a'],并将最接近的值添加为新列df1.
换句话说,每个值x的df1['a'],我想找到一个y即实现min(abx(x-y))对所有y in df2['b'](注:可以假设,仅仅是有一个y能够实现的最小距离),其结果将是
'a' 'b'
1 3
2 3
5 6
Run Code Online (Sandbox Code Playgroud)
我尝试使用以下代码首先创建距离矩阵(在找到达到最小距离的值之前):
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
Run Code Online (Sandbox Code Playgroud)
这使
Column<PythonUDF#dist(a,b)>
Run Code Online (Sandbox Code Playgroud)
然后我试了一下
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
Run Code Online (Sandbox Code Playgroud)
它会永远运行而不会产生错误/输出.
我的问题是:
a和b值创建一个距离矩阵,然后找到min …