编写镶木地板文件时如何避免空文件?

ran*_*ddy 2 apache-spark pyspark spark-structured-streaming

我正在使用 Spark Structured Streaming 从 Kafka 队列中读取数据。从卡夫卡我看完后正在申请filterdataframe。我正在将此过滤后的数据框保存到镶木地板文件中。这会生成许多空的镶木地板文件。有什么办法可以停止写入空文件吗?

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KafkaServer) \
    .option("subscribe", KafkaTopics) \
    .load()

Transaction_DF = df.selectExpr("CAST(value AS STRING)")

decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
filterDF = decomDF.filter(.....) 

query = filterDF .writeStream \
    .option("path", outputpath) \
    .option("checkpointLocation", RawXMLCheckpoint) \
    .start()
Run Code Online (Sandbox Code Playgroud)

Jac*_*ski 5

有什么方法可以停止写入空文件。

是的,但你宁愿这样做。

许多空 parquet 文件的原因是 Spark SQL(结构化流的底层基础设施)尝试猜测加载数据集的分区数量(每批来自 Kafka 的记录),并且执行得“很差”,即许多分区没有数据。

当您保存没有数据的分区时,您将得到一个空文件。

您可以使用repartitioncoalesce运算符来设置适当的分区数量并减少(甚至完全避免)空文件。请参阅数据集 API

你为什么这样做呢?repartition并且coalesce可能会由于在分区(以及可能是 Spark 集群中的节点)之间重新整理数据的额外步骤而导致性能下降。这可能很昂贵并且不值得这样做(因此我说你宁愿不这样做)。

然后您可能会问自己,如何知道正确的分区数量?在任何Spark 项目中,这都是一个非常好的问题。答案相当简单(如果您了解 Spark 处理什么以及如何处理,那么答案就很明显):“了解您的数据”,以便您可以计算有多少数据是完全正确的。


lha*_*amp 5

我建议repartition(partitioningColumns)在 Dataframe 上使用。数据集和之后partitionBy(partitioningColumns)writeStream操作,以避免写空文件。

原因: 如果您有大量数据,瓶颈通常是 Spark 的读取性能,如果您有很多小(甚至是空)文件且没有分区。所以你绝对应该使用文件/目录分区(这与 RDD 分区不同)。这在使用 AWS S3 时尤其成问题。在读取时间戳/天、消息类型/Kafka 主题等数据时,partitionColumns 应该适合您的常见查询。

另请参阅http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriterpartitionBy上的文档

按文件系统上的给定列对输出进行分区。如果指定,输出将在类似于 Hive 的分区方案的文件系统上进行布局。例如,当我们按年然后按月对数据集进行分区时,目录布局将如下所示:

年=2016/月=01/,年=2016/月=02/

分区是最广泛使用的优化物理数据布局的技术之一。当查询在分区列上有谓词时,它提供了一个粗粒度的索引来跳过不必要的数据读取。为了使分区正常工作,每列中不同值的数量通常应少于数万个。

这适用于所有基于文件的数据源(例如 Parquet、JSON)启动 Spark 2.1.0。