小编gam*_*e25的帖子

Databricks Delta Live 表 - 应用增量表中的更改

我正在使用 Databricks Delta Live Tables,但在向上游插入某些表时遇到一些问题。我知道下面的文字很长,但我试图尽可能清楚地描述我的问题。如果某些部分不清楚,请告诉我。

我有以下表格和流程:

Landing_zone -> 这是一个添加 JSON 文件的文件夹,其中包含插入或更新的记录的数据。Raw_table -> 这是 JSON 文件中的数据,但采用表格格式。该表采用增量格式。除了将 JSON 结构转换为表格结构(我进行了爆炸,然后从 JSON 键创建列)之外,未进行任何转换。Intermediate_table -> 这是 raw_table,但有一些额外的列(取决于其他列值)。

为了从我的着陆区转到原始表,我有以下 Pyspark 代码:

cloudfile = {"cloudFiles.format":"JSON", 
                       "cloudFiles.schemaLocation": sourceschemalocation, 
                       "cloudFiles.inferColumnTypes": True}

@dlt.view('landing_view')
def inc_view():
    df = (spark
             .readStream
             .format('cloudFiles')
             .options(**cloudFilesOptions)
             .load(filpath_to_landing)
     <Some transformations to go from JSON to tabular (explode, ...)>
     return df

dlt.create_target_table('raw_table', 
                        table_properties = {'delta.enableChangeDataFeed': 'true'})
  
dlt.apply_changes(target='raw_table',
                  source='landing_view',
                  keys=['id'],
                  sequence_by='updated_at')
Run Code Online (Sandbox Code Playgroud)

这段代码按预期工作。我运行它,将一个changes.JSON文件添加到登陆区域,重新运行管道,并将更新插入正确地应用于“raw_table”

(但是,每次在 delta 文件夹中创建包含所有数据的新 parquet 文件时,我希望只添加包含插入和更新行的 parquet 文件?并且有关当前版本的一些信息保留在 delta 中日志?不确定这是否与我的问题相关。我已经将“raw_table”的 table_properties 更改为enableChangeDataFeed = true。“intermediate_table”的 readStream 有选项(readChangeFeed,“true”))。

然后我有以下代码从“raw_table”转到“intermediate_table”: …

databricks delta-lake databricks-autoloader delta-live-tables

7
推荐指数
1
解决办法
7742
查看次数

PySpark动态创建StructType

我遇到的情况是我的数据如下所示:

ID 价值观 图式
2 {'colA':3.2,'colB':'val2','colC':3.4} {'colA':'FLOAT', 'colB':'STRING', 'colC':'FLOAT'}
3 {'colC':3.2,'colX':3.9} {'colC':'FLOAT', 'colX':'FLOAT'}
4 {'colG':'val1','colH':93.2} {'colG':'STRING', 'colH':'FLOAT'}
5 {'colG':'val4', 'colA':4.2, 'colJ':93.2, 'colM':'val4'} {'colG':'STRING', 'colA':'FLOAT', 'ColJ':'FLOAT', 'ColM':'STRING'}

和列最初valuesschema存储为StringType. 我想将values列转换为StructType定义每个可能的键的列。最终架构应如下所示:

 |-- id: integer (nullable = false)
 |-- values: struct (nullable = true)
 |    |-- colA: double (nullable = true)
 |    |-- colB: string (nullable = true)
 |    |-- colC: double (nullable = true)
 |    |-- colG: string (nullable = true)
 |    |-- …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark databricks delta-lake

2
推荐指数
1
解决办法
383
查看次数