sup*_*kar 5 apache-spark pyspark
df1 有字段id和json;df2 有字段id和json
df1.count()=> 1200; df2.count()=> 20
df1 包含所有行。df2 有一个只有 20 行的增量更新。
我的目标是用 .df1 中的值更新 df1 df2。的所有 iddf2都在 df1 中。但是 df2 已经更新json了这些相同 ID 的值(在字段中)。
结果 df 应该具有来自 的所有值df1和来自 的更新值df2。
做这个的最好方式是什么?- 使用最少的连接和过滤器。
谢谢!
您可以使用一个左连接来实现这一点。
创建示例数据帧
使用@Shankar Koirala 在他的回答中提供的示例数据。
data1 = [
(1, "a"),
(2, "b"),
(3, "c")
]
df1 = sqlCtx.createDataFrame(data1, ["id", "value"])
data2 = [
(1, "x"),
(2, "y")
]
df2 = sqlCtx.createDataFrame(data2, ["id", "value"])
Run Code Online (Sandbox Code Playgroud)
进行左连接
使用id列上的左连接连接两个 DataFrame 。这将保留左侧 DataFrame 中的所有行。对于右侧 DataFrame 中没有匹配的行id,该值将为null。
import pyspark.sql.functions as f
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
.select(
'id',
f.col('l.value').alias('left_value'),
f.col('r.value').alias('right_value')
)\
.show()
#+---+----------+-----------+
#| id|left_value|right_value|
#+---+----------+-----------+
#| 1| a| x|
#| 3| c| null|
#| 2| b| y|
#+---+----------+-----------+
Run Code Online (Sandbox Code Playgroud)
选择所需数据
我们将使用不匹配的ids 具有 anull来选择最终列的事实。使用pyspark.sql.functions.when()使用权价值,如果它不为空,否则保持左值。
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
.select(
'id',
f.when(
~f.isnull(f.col('r.value')),
f.col('r.value')
).otherwise(f.col('l.value')).alias('value')
)\
.show()
#+---+-----+
#| id|value|
#+---+-----+
#| 1| x|
#| 3| c|
#| 2| y|
#+---+-----+
Run Code Online (Sandbox Code Playgroud)
如果您希望ids 按顺序排列,则可以对此输出进行排序。
使用 pyspark-sql
您可以使用 pyspark-sql 查询执行相同的操作:
df1.registerTempTable('df1')
df2.registerTempTable('df2')
query = """SELECT l.id,
CASE WHEN r.value IS NOT NULL THEN r.value ELSE l.value END AS value
FROM df1 l LEFT JOIN df2 r ON l.id = r.id"""
sqlCtx.sql(query.replace("\n", "")).show()
#+---+-----+
#| id|value|
#+---+-----+
#| 1| x|
#| 3| c|
#| 2| y|
#+---+-----+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5275 次 |
| 最近记录: |