Spark增量加载会覆盖旧记录

Sam*_*han 3 etl presto apache-spark pyspark

我需要通过使用Spark(PySpark)对表进行增量加载

这是示例:

第一天

id | value
-----------
1  | abc
2  | def
Run Code Online (Sandbox Code Playgroud)

第二天

id | value
-----------
2  | cde
3  | xyz
Run Code Online (Sandbox Code Playgroud)

预期结果

id | value
-----------
1  | abc
2  | cde
3  | xyz
Run Code Online (Sandbox Code Playgroud)

这可以在关系数据库中轻松完成,
想知道是否可以在Spark或其他转换工具(例如Presto)中完成?

vik*_*ana 9

干得好!第一个数据框:

 >>> list1 = [(1, 'abc'),(2,'def')]
 >>> olddf = spark.createDataFrame(list1, ['id', 'value'])
 >>> olddf.show();
 +---+-----+
 | id|value|
 +---+-----+
 |  1|  abc|
 |  2|  def|
 +---+-----+
Run Code Online (Sandbox Code Playgroud)

第二个数据框:

>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
|  2|  cde|
|  3|  xyz|
+---+-----+
Run Code Online (Sandbox Code Playgroud)

现在使用合并功能合并和合并这两个数据名望

from pyspark.sql.functions import *

>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  3|  xyz|
|  2|  cde|
+---+-----+
Run Code Online (Sandbox Code Playgroud)

我希望这可以解决您的问题。:-)

  • 感谢您的逐步说明,想知道是否可以通过dropDuplicates([“ id”])轻松完成此操作? (2认同)