如何使用 SPARK 将多个 parquet 文件转换为 TFrecord 文件?

Kri*_*tof 5 apache-spark parquet apache-spark-sql pyspark tfrecord

我想根据特定条件从大型 DataFrame 生成分层 TFrecord 文件,为此我使用write.partitionBy(). 我也在 SPARK 中使用tensorflow-connector,但这显然不能与write.partitionBy()操作一起使用。因此,除了尝试分两步工作之外,我还没有找到其他方法:

  1. 根据我的条件重新分区数据帧,使用partitionBy()生成的分区并将其写入镶木地板文件。
  2. 读取这些 parquet 文件,并使用 tensorflow-connector 插件将它们转换为 TFrecord 文件。

这是我无法有效完成的第二步。我的想法是读取执行器上的各个 parquet 文件并立即将它们写入 TFrecord 文件。但这需要访问 SQLContext,这只能在驱动程序中完成(此处讨论),因此不能并行。我想做这样的事情:

# List all parquet files to be converted
import glob, os
files = glob.glob('/path/*.parquet'))

sc = SparkSession.builder.getOrCreate()
sc.parallelize(files, 2).foreach(lambda parquetFile: convert_parquet_to_tfrecord(parquetFile))
Run Code Online (Sandbox Code Playgroud)

convert_parquet_to_tfrecord我可以构造能够在执行器上执行此操作的函数吗?

我还尝试在读取所有镶木地板文件时仅使用通配符:

SQLContext(sc).read.parquet('/path/*.parquet')
Run Code Online (Sandbox Code Playgroud)

这确实读取了所有镶木地板文件,但不幸的是不是读取到单独的分区中。看来原始结构丢失了,因此如果我想要将各个镶木地板文件的确切内容转换为 TFrecord 文件,这对我没有帮助。

还有其他建议吗?

Maj*_*han 0

如果我正确理解你的问题,你想将分区本地写入工作人员磁盘上。

如果是这种情况,那么我建议查看Spark-tensorflow-connector 的说明,了解如何执行此操作。

这是您正在寻找的代码(如上面链接的文档中所述):

myDataFrame.write.format("tfrecords").option("writeLocality", "local").save("/path")  
Run Code Online (Sandbox Code Playgroud)

顺便说一句,如果您担心效率,为什么要使用 pyspark?最好用scala代替。