我在使用 pysparkSQL 与 delta 表合并 csv 文件时遇到问题。我设法创建 upsert 函数,如果匹配则更新,如果不匹配则插入。
我想将列添加ID到最终的增量表中,并在每次插入数据时递增它。此列标识增量表中的每一行。有什么办法可以把它落实到位吗?
def Merge(dict1, dict2):
res = {**dict1, **dict2}
return res
def create_default_values_dict(correspondance_df,marketplace):
dict_output = {}
for field in get_nan_keys_values(get_mapping_dict(correspondance_df, marketplace)):
dict_output[field] = 'null'
# We want to increment the id row each time we perform an insertion (TODO TODO TODO)
# if field == 'id':
# dict_output['id'] = col('id')+1
# else:
return dict_output
def create_matched_update_dict(mapping, products_table, updates_table):
output = {}
for k,v in mapping.items():
if k == 'source_name':
output['products.source_name'] …Run Code Online (Sandbox Code Playgroud)