abd*_*zam 6 apache-spark apache-spark-sql pyspark delta-lake
我在使用 pysparkSQL 与 delta 表合并 csv 文件时遇到问题。我设法创建 upsert 函数,如果匹配则更新,如果不匹配则插入。
我想将列添加ID到最终的增量表中,并在每次插入数据时递增它。此列标识增量表中的每一行。有什么办法可以把它落实到位吗?
def Merge(dict1, dict2):
res = {**dict1, **dict2}
return res
def create_default_values_dict(correspondance_df,marketplace):
dict_output = {}
for field in get_nan_keys_values(get_mapping_dict(correspondance_df, marketplace)):
dict_output[field] = 'null'
# We want to increment the id row each time we perform an insertion (TODO TODO TODO)
# if field == 'id':
# dict_output['id'] = col('id')+1
# else:
return dict_output
def create_matched_update_dict(mapping, products_table, updates_table):
output = {}
for k,v in mapping.items():
if k == 'source_name':
output['products.source_name'] = lit(v)
else:
output[products_table + '.' + k] = F.when(col(updates_table + '.' + v).isNull(), col(products_table + '.' + k)).when(col(updates_table + '.' + v).isNotNull(), col(updates_table + '.' + v))
return output
insert_dict = create_not_matched_insert_dict(mapping, 'products', 'updates')
default_dict = create_default_values_dict(correspondance_df_products, 'Cdiscount')
insert_values = Merge(insert_dict, default_dict)
update_values = create_matched_update_dict(mapping, 'products', 'updates')
delta_table_products.alias('products').merge(
updates_df_table.limit(20).alias('updates'),
"products.barcode_ean == updates.ean") \
.whenMatchedUpdate(set = update_values) \
.whenNotMatchedInsert(values = insert_values)\
.execute()
Run Code Online (Sandbox Code Playgroud)
我尝试增加id函数中的列create_default_values_dict,但似乎效果不佳,它不会自动增加 1。还有其他方法可以解决此问题吗?提前致谢 :)
Databricks 具有用于托管 Spark 的 IDENTITY 列
GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY
[ ( [ START WITH start ] [ INCREMENT BY step ] ) ]
Run Code Online (Sandbox Code Playgroud)
这适用于 Delta 表。
例子:
create table gen1 (
id long GENERATED ALWAYS AS IDENTITY
, t string
)
Run Code Online (Sandbox Code Playgroud)
需要运行时版本 10.4 或更高版本。
Delta 不支持自动增量列类型。
一般来说,Spark不使用自增ID,而是倾向于单调递增ID。看functions.monotonically_increasing_id()。
如果你想实现自动增量行为,你将不得不使用多个Delta操作,例如,查询最大值+将其添加到row_number()通过窗口函数计算的列+然后写入。这是有问题的,原因有两个:
除非引入外部锁定机制或其他方式来确保在查找最大值和写入之间不会发生对表的更新,否则最终可能会得到无效数据。
使用row_number()会将并行度降低到1,强制所有数据通过单个核心,这对于大数据来说会非常慢。
最重要的是,您确实不想在 Spark 中使用自动增量列。
希望这可以帮助。
| 归档时间: |
|
| 查看次数: |
16474 次 |
| 最近记录: |