mar*_*tin 6 join dataframe apache-spark apache-spark-sql pyspark
我想在这两个PySpark DataFrame之间执行连接:
from pyspark import SparkContext
from pyspark.sql.functions import col
sc = SparkContext()
df1 = sc.parallelize([
['owner1', 'obj1', 0.5],
['owner1', 'obj1', 0.2],
['owner2', 'obj2', 0.1]
]).toDF(('owner', 'object', 'score'))
df2 = sc.parallelize(
[Row(owner=u'owner1',
objects=[Row(name=u'obj1', value=Row(fav=True, ratio=0.3))])]).toDF()
Run Code Online (Sandbox Code Playgroud)
在已经加入到在对象,即字段的名称进行命名的内部对象为DF2和对象为DF1.
我可以在嵌套字段上执行SELECT,如
df2.where(df2.owner == 'owner1').select(col("objects.value.ratio")).show()
Run Code Online (Sandbox Code Playgroud)
但是我无法运行此连接:
df2.alias('u').join(df1.alias('s'), col('u.objects.name') == col('s.object'))
Run Code Online (Sandbox Code Playgroud)
返回错误
pyspark.sql.utils.AnalysisException:由于数据类型不匹配,u"无法解析'(objects.name = cast(object as double))''''(objects.name = cast(object as double))'中的不同类型(数组和双).;"
任何想法如何解决这个问题?
由于您希望匹配并提取特定元素,因此最简单的方法是explode行:
matches = df2.withColumn("object", explode(col("objects"))).alias("u").join(
df1.alias("s"),
col("s.object") == col("u.object.name")
)
matches.show()
## +-------------------+------+-----------------+------+------+-----+
## | objects| owner| object| owner|object|score|
## +-------------------+------+-----------------+------+------+-----+
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1| obj1| 0.5|
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1| obj1| 0.2|
## +-------------------+------+-----------------+------+------+-----+
Run Code Online (Sandbox Code Playgroud)
替代但非常低效的方法是使用array_contains:
matches_contains = df1.alias("s").join(
df2.alias("u"), expr("array_contains(objects.name, object)"))
Run Code Online (Sandbox Code Playgroud)
它无效,因为它将扩展为笛卡尔积:
matches_contains.explain()
## == Physical Plan ==
## Filter array_contains(objects#6.name,object#4)
## +- CartesianProduct
## :- Scan ExistingRDD[owner#3,object#4,score#5]
## +- Scan ExistingRDD[objects#6,owner#7]
Run Code Online (Sandbox Code Playgroud)
如果数组的大小相对较小,则可以生成array_contains我在此处显示的优化版本:过滤列值是否等于spark中的列表
| 归档时间: |
|
| 查看次数: |
1295 次 |
| 最近记录: |