相关疑难解决方法(0)

如何在Spark中分区和写入DataFrame而不删除没有新数据的分区?

我试图DataFrame用Parquet格式保存到HDFS,使用DataFrameWriter三列值进行分区,如下所示:

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
Run Code Online (Sandbox Code Playgroud)

正如提到的这个问题,partitionBy将在删除分区的全部现有层次path,并在分区取而代之dataFrame.由于特定日期的新增量数据将定期出现,我想要的是仅替换层次结构中dataFrame具有数据的那些分区,而保持其他分区不变.

要做到这一点,似乎我需要使用其完整路径单独保存每个分区,如下所示:

singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
Run Code Online (Sandbox Code Playgroud)

但是我无法理解将数据组织到单分区中DataFrame的最佳方法,以便我可以使用它们的完整路径将它们写出来.一个想法是这样的:

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
Run Code Online (Sandbox Code Playgroud)

foreachPartition操作上Iterator[Row]是不理想的写出来镶木格式.

我还考虑使用a select...distinct eventdate, hour, processtime获取分区列表,然后按每个分区过滤原始数据帧并将结果保存到完整的分区路径.但是,每个分区的独特查询加过滤器似乎效率不高,因为它会进行大量的过滤/写入操作.

我希望有一种更简洁的方法来保留dataFrame没有数据的现有分区?

谢谢阅读.

Spark版本:2.1

partitioning apache-spark parquet spark-dataframe

25
推荐指数
3
解决办法
3万
查看次数

如何向 Spark 中的现有分区添加行?

我必须更新历史数据。我所说的更新是指向 S3 上的现有分区添加新行,有时甚至添加新列。

当前分区是按日期实现的:created_year={}/created_month={}/created_day={}。为了避免每个分区有太多对象,我执行以下操作来维护单个对象/分区:

def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('Trying to save repartitioned data at: {}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").parquet(dest_path)
    print('Data repartitioning complete with at the following location: ')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions
Run Code Online (Sandbox Code Playgroud)

存在一种情况,我必须添加具有这些列值的某些行:

created_year | created_month | created_day
2019         |10             |27   
Run Code Online (Sandbox Code Playgroud)

这意味着此路径中的文件(S3 对象)created_year=2019/created_month=10/created_day=27/some_random_name.parquet将附加新行。

如果架构发生更改,则所有对象都必须实现该更改。

我尝试研究它通常是如何工作的,所以,有两种感兴趣的模式:覆盖、追加。

第一个将仅添加当前数据并删除其余数据。我不希望出现这种情况。第二个将追加,但最终可能会创建更多对象。我也不希望出现这种情况。我还了解到 Spark 中的数据帧是不可变的。

那么,如何实现在新数据到达现有分区时将其追加并每天维护一个对象呢?

amazon-s3 apache-spark pyspark

3
推荐指数
1
解决办法
1816
查看次数

如何在不覆盖的情况下将 Spark Streaming 输出写入 HDFS

经过一些处理后,我有一个 DStream[String , ArrayList[String]] ,所以当我使用 saveAsTextFile 将它写入 hdfs 并在每批后覆盖数据时,如何通过附加到以前的结果来写入新结果

output.foreachRDD(r => {
  r.saveAsTextFile(path)
})
Run Code Online (Sandbox Code Playgroud)

编辑 ::如果有人可以帮助我将输出转换为 avro 格式,然后附加到 HDFS

apache-kafka spark-streaming

2
推荐指数
1
解决办法
9124
查看次数