小编Ash*_*her的帖子

我如何迭代代码存储库中的 json 文件并增量附加到数据集

我已经通过数据连接将一个包含 100,000 个大约 100GB 的原始 json 文件的数据集导入到代工厂中。我想使用Python Transforms raw file access转换来读取文件,将结构和结构的数组展平到数据帧中,作为对 df 的增量更新。我想使用来自 *.json 文件的文档中的以下示例中的内容,并将其转换为使用@incremental()装饰器更新的增量。

>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
...     processed=Output('/examples/hair_eye_color_processed'),
...     hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
...    def process_file(file_status):
...        with hair_eye_color.filesystem().open(file_status.path) as f:
...            r = csv.reader(f)
...
...            # Construct a pyspark.Row from our header row
...            header = next(r)
...            MyRow = Row(*header)
...
... …
Run Code Online (Sandbox Code Playgroud)

pyspark palantir-foundry foundry-code-repositories foundry-code-workbooks

5
推荐指数
0
解决办法
285
查看次数