Kri*_*tof 5 apache-spark parquet apache-spark-sql pyspark tfrecord
我想根据特定条件从大型 DataFrame 生成分层 TFrecord 文件,为此我使用write.partitionBy(). 我也在 SPARK 中使用tensorflow-connector,但这显然不能与write.partitionBy()操作一起使用。因此,除了尝试分两步工作之外,我还没有找到其他方法:
partitionBy()生成的分区并将其写入镶木地板文件。这是我无法有效完成的第二步。我的想法是读取执行器上的各个 parquet 文件并立即将它们写入 TFrecord 文件。但这需要访问 SQLContext,这只能在驱动程序中完成(此处讨论),因此不能并行。我想做这样的事情:
# List all parquet files to be converted
import glob, os
files = glob.glob('/path/*.parquet'))
sc = SparkSession.builder.getOrCreate()
sc.parallelize(files, 2).foreach(lambda parquetFile: convert_parquet_to_tfrecord(parquetFile))
Run Code Online (Sandbox Code Playgroud)
convert_parquet_to_tfrecord我可以构造能够在执行器上执行此操作的函数吗?
我还尝试在读取所有镶木地板文件时仅使用通配符:
SQLContext(sc).read.parquet('/path/*.parquet')
Run Code Online (Sandbox Code Playgroud)
这确实读取了所有镶木地板文件,但不幸的是不是读取到单独的分区中。看来原始结构丢失了,因此如果我想要将各个镶木地板文件的确切内容转换为 TFrecord 文件,这对我没有帮助。
还有其他建议吗?
如果我正确理解你的问题,你想将分区本地写入工作人员磁盘上。
如果是这种情况,那么我建议查看Spark-tensorflow-connector 的说明,了解如何执行此操作。
这是您正在寻找的代码(如上面链接的文档中所述):
myDataFrame.write.format("tfrecords").option("writeLocality", "local").save("/path")
Run Code Online (Sandbox Code Playgroud)
顺便说一句,如果您担心效率,为什么要使用 pyspark?最好用scala代替。
| 归档时间: |
|
| 查看次数: |
4618 次 |
| 最近记录: |