Kam*_*zny 6 r apache-spark sparkr
在我的 R 脚本中,我有一个SparkDataFrame包含四个不同月份数据的两列(时间、值)。由于我需要将我的函数分别应用到每个月,我想我会将repartition它分成四个分区,每个分区将保存一个月的数据。
我创建了一个名为 partition 的附加列,具有一个整数值 0 - 3,然后repartition通过此特定列调用该方法。
可悲的是,正如本主题中所描述的那样:
Spark SQL - df.repartition 和 DataFrameWriter partitionBy 之间的区别?,使用该repartition方法我们只确定所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以最终在同一个分区中。
就我而言,执行下面可见的代码会创建 4 个分区,但只用数据填充其中的 2 个。
我想我应该使用该partitionBy方法,但是在 SparkR 的情况下,我不知道该怎么做。官方文档指出,此方法适用于称为WindowSpec而不是DataFrame.
我真的很感激这方面的一些帮助,因为我不知道如何将此方法合并到我的代码中。
sparkR.session(
master="local[*]", sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)
schema <- structType(
structField("time", "timestamp"),
structField("value", "double"),
structField("partition", "string"))
processedDf <- dapply(repartitionedDf,
function(x) { data.frame(produceHourlyResults(x), stringsAsFactors = FALSE) },
schema)
Run Code Online (Sandbox Code Playgroud)
你使用了错误的方法。如果你
需要将我的功能分别应用于每个月
你应该用gapply那个
使用指定的列对 SparkDataFrame 进行分组,并将 R 函数应用于每个组。
df %>% group_by("month") %>% gapply(fun, schema)
Run Code Online (Sandbox Code Playgroud)
或者
df %>% gapply("month", fun, schema)
Run Code Online (Sandbox Code Playgroud)
就我而言,执行下面可见的代码会创建 4 个分区,但只用数据填充其中的 2 个。
这表明散列冲突。合理增加唯一键数量以上的分区数量应该可以解决问题:
spark.sql.shuffle.partitions 17
Run Code Online (Sandbox Code Playgroud)
我想我应该使用 partitionBy 方法,但是
编号partitionBy与窗口函数(SparkR 窗口函数)一起使用。
要解决您的评论:
我决定将 dapply 与单独的分区一起使用,以便能够轻松地将每个月保存到单独的 CSV 文件中
哈希分区器不能这样工作HashPartitioner是如何工作的?
您可以partitionBy在 writer 中尝试使用,但我不确定 SparkR 是否直接支持它。它在结构化流中受支持,对于批处理,您可能必须调用 Java 方法或使用带有 Metastore 的表:
createDataFrame(iris) %>% createOrReplaceTempView("iris_view")
sql(
"CREATE TABLE iris
USING csv PARTITIONED BY(species)
LOCATION '/tmp/iris' AS SELECT * FROM iris_view"
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
773 次 |
| 最近记录: |