使用小于100的行组大小在spark中创建镶木地板文件

Yev*_*vin 7 hadoop apache-spark parquet

我有一个具有少量字段的火花数据帧.一些字段是巨大的二进制blob.整行的大小约为50 MB.

我将数据框保存为镶木地板格式.我正在使用parquet.block.size参数控制行组的大小.

Spark会生成一个镶木地板文件,但是我总是会在一行中获得至少100行.这对我来说是一个问题,因为块大小可能会变成千兆字节,这对我的应用程序不起作用.

parquet.block.size 只要尺寸大到足以容纳超过100行,就可以正常工作.

我修改InternalParquetRecordWriter.javaMINIMUM_RECORD_COUNT_FOR_CHECK = 2,它解决了该问题,但没有配置值,我可以找到一个能够支持调整,该硬编码不变.

是否有不同/更好的方法来获得小于100的行组大小?

这是我的代码片段:

from pyspark import Row
from pyspark.sql import SparkSession
import numpy as np

from pyspark.sql.types import StructType, StructField, BinaryType


def fake_row(x):
    result = bytearray(np.random.randint(0, 127, (3 * 1024 * 1024 / 2), dtype=np.uint8).tobytes())
    return Row(result, result)

spark_session = SparkSession \
    .builder \
    .appName("bbox2d_dataset_extraction") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "4g")

spark_session.master('local[5]')

spark = spark_session.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 8 * 1024 * 1024)

index = sc.parallelize(range(50), 5)
huge_rows = index.map(fake_row)
schema = StructType([StructField('f1', BinaryType(), False), StructField('f2', BinaryType(), False)])

bbox2d_dataframe = spark.createDataFrame(huge_rows, schema).coalesce(1)
bbox2d_dataframe. \
    write.option("compression", "none"). \
    mode('overwrite'). \
    parquet('/tmp/huge/')
Run Code Online (Sandbox Code Playgroud)

Pra*_*ota 4

不幸的是我还没有找到一种方法来做到这一点。我报告此问题是为了删除硬编码值并使它们可配置。如果你有兴趣的话我有一个补丁。