ran*_*ddy 2 apache-spark pyspark spark-structured-streaming
我正在使用 Spark Structured Streaming 从 Kafka 队列中读取数据。从卡夫卡我看完后正在申请filter的dataframe。我正在将此过滤后的数据框保存到镶木地板文件中。这会生成许多空的镶木地板文件。有什么办法可以停止写入空文件吗?
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
filterDF = decomDF.filter(.....)
query = filterDF .writeStream \
.option("path", outputpath) \
.option("checkpointLocation", RawXMLCheckpoint) \
.start()
Run Code Online (Sandbox Code Playgroud)
有什么方法可以停止写入空文件。
是的,但你宁愿不这样做。
许多空 parquet 文件的原因是 Spark SQL(结构化流的底层基础设施)尝试猜测加载数据集的分区数量(每批来自 Kafka 的记录),并且执行得“很差”,即许多分区没有数据。
当您保存没有数据的分区时,您将得到一个空文件。
您可以使用repartition或coalesce运算符来设置适当的分区数量并减少(甚至完全避免)空文件。请参阅数据集 API。
你为什么不这样做呢?repartition并且coalesce可能会由于在分区(以及可能是 Spark 集群中的节点)之间重新整理数据的额外步骤而导致性能下降。这可能很昂贵并且不值得这样做(因此我说你宁愿不这样做)。
然后您可能会问自己,如何知道正确的分区数量?在任何Spark 项目中,这都是一个非常好的问题。答案相当简单(如果您了解 Spark 处理什么以及如何处理,那么答案就很明显):“了解您的数据”,以便您可以计算有多少数据是完全正确的。
我建议repartition(partitioningColumns)在 Dataframe 上使用。数据集和之后partitionBy(partitioningColumns)的writeStream操作,以避免写空文件。
原因: 如果您有大量数据,瓶颈通常是 Spark 的读取性能,如果您有很多小(甚至是空)文件且没有分区。所以你绝对应该使用文件/目录分区(这与 RDD 分区不同)。这在使用 AWS S3 时尤其成问题。在读取时间戳/天、消息类型/Kafka 主题等数据时,partitionColumns 应该适合您的常见查询。
另请参阅http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriterpartitionBy上的文档
按文件系统上的给定列对输出进行分区。如果指定,输出将在类似于 Hive 的分区方案的文件系统上进行布局。例如,当我们按年然后按月对数据集进行分区时,目录布局将如下所示:
年=2016/月=01/,年=2016/月=02/
分区是最广泛使用的优化物理数据布局的技术之一。当查询在分区列上有谓词时,它提供了一个粗粒度的索引来跳过不必要的数据读取。为了使分区正常工作,每列中不同值的数量通常应少于数万个。
这适用于所有基于文件的数据源(例如 Parquet、JSON)启动 Spark 2.1.0。
| 归档时间: |
|
| 查看次数: |
9622 次 |
| 最近记录: |