如何处理代码存储库中的大文件?

Geo*_*per 5 palantir-foundry foundry-code-repositories

我有一个数据源,每天都会提供一个大的 .txt 文件(50-75GB)。该文件包含多个不同的模式,其中每一行对应一个模式。我想将其拆分为每个模式的分区数据集,如何有效地做到这一点?

van*_*ser 3

您需要解决的最大问题是恢复模式的迭代速度,这对于这种规模的文件来说可能是一个挑战。

这里最好的策略是获取一个示例“名义”文件,其中包含要恢复的每个模式作为其中的一行,并将其添加为存储库中的文件。当您将此文件添加到存储库(与转换逻辑一起)时,您将能够将其推送到数据帧中,就像处理数据集中的原始文件一样,以进行快速测试迭代。

首先,确保将txt文件指定为包内容的一部分,这样您的测试就会发现它们(这在 下的文档中介绍Read a file from a Python repository):

您可以将存储库中的其他文件读取到转换上下文中。这对于设置转换代码引用的参数可能很有用。

首先,在您的 python 存储库中编辑 setup.py:

setup(
   name=os.environ['PKG_NAME'],
# ...
    package_data={
        '': ['*.txt']
    }
)
Run Code Online (Sandbox Code Playgroud)

我正在使用包含以下内容的 txt 文件:

my_column, my_other_column
some_string,some_other_string
some_thing,some_other_thing,some_final_thing
Run Code Online (Sandbox Code Playgroud)

该文本文件位于我的存储库中的以下路径:transforms-python/src/myproject/datasets/raw.txt

将文本文件配置为随逻辑一起提供后,并且将文件本身包含在存储库中后,您就可以包含以下代码。这段代码有几个重要的功能:

  1. 它将原始文件解析逻辑与将文件读入 Spark DataFrame 的阶段完全分开。这样,这个 DataFrame 的构建方式就可以留给测试基础设施或运行时,具体取决于您运行的位置。
  2. 这种保持逻辑分离的方式可以让您确保您想要执行的实际逐行解析是其自己的可测试函数,而不是让它纯粹存在于您的内部my_compute_function
  3. 此代码使用 Spark 原生spark_session.read.text方法,该方法比原始 txt 文件的逐行解析快几个数量级。这将确保并行化的 DataFrame 是您在执行器(或更糟糕的是,您的驱动程序)内逐行操作的单个文件,而不是单个文件。
my_column, my_other_column
some_string,some_other_string
some_thing,some_other_thing,some_final_thing
Run Code Online (Sandbox Code Playgroud)

一旦您启动并运行此代码,您的test_my_compute_function方法将非常快速地迭代,以便您可以完善您的模式恢复逻辑。这将使您在最后构建数据集变得更加容易,但不会产生完整构建的任何开销。