rak*_*ake 119 scala partitioning dataframe apache-spark apache-spark-sql
我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames.我想在Scala中定义DataFrame上的自定义分区程序,但是没有看到如何执行此操作.
我正在使用的一个数据表包含一个事务列表,按帐户,silimar到下面的示例.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Run Code Online (Sandbox Code Playgroud)
至少在最初,大多数计算将发生在帐户内的交易之间.所以我希望对数据进行分区,以便帐户的所有事务都在同一个Spark分区中.
但我没有看到定义这个的方法.DataFrame类有一个名为"repartition(Int)"的方法,您可以在其中指定要创建的分区数.但我没有看到任何方法可用于为DataFrame定义自定义分区程序,例如可以为RDD指定.
源数据存储在Parquet中.我确实看到在向Parquet编写DataFrame时,您可以指定要分区的列,因此我可以告诉Parquet通过"帐户"列对其数据进行分区.但是可能有数百万个帐户,如果我正确理解Parquet,它会为每个帐户创建一个独特的目录,因此这听起来不是一个合理的解决方案.
有没有办法让Spark分区这个DataFrame,以便一个帐户的所有数据都在同一个分区?
zer*_*323 170
SPARK-22614公开了范围分区.
val partitionedByRange = df.repartitionByRange(42, $"k")
partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
// +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]
Run Code Online (Sandbox Code Playgroud)
SPARK-22389在Data Source API v2中公开了外部格式分区.
在Spark> = 1.6中,可以按列使用分区进行查询和缓存.请参阅:SPARK-11410和SPARK-4849使用repartition方法:
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- Scan PhysicalRDD[_1#5,_2#6]
Run Code Online (Sandbox Code Playgroud)
与RDDsSpark Dataset(包括Dataset[Row]aka DataFrame)不同,现在不能使用自定义分区器.您通常可以通过创建人工分区列来解决这个问题,但它不会给您相同的灵活性.
您可以做的一件事是在创建之前预先分区输入数据 DataFrame
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner
val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))
val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))
val partitioner = new HashPartitioner(5)
val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values
val df = sqlContext.createDataFrame(partitioned, schema)
Run Code Online (Sandbox Code Playgroud)
由于DataFrame创建RDD只需要一个简单的映射阶段,现有的分区布局应该保留*:
assert(df.rdd.partitions == partitioned.partitions)
Run Code Online (Sandbox Code Playgroud)
您可以使用相同的方式重新分配现有内容DataFrame:
sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
Run Code Online (Sandbox Code Playgroud)
所以看起来并非不可能.问题仍然存在,如果它有意义的话.我认为大多数时候它不会:
重新分区是一个昂贵的过程.在典型情况下,大多数数据必须被序列化,洗牌和反序列化.另一方面,可以从预分区数据中受益的操作数量相对较小,并且如果内部API不是设计用于利用此属性,则进一步限制.
GROUP BY- 可以减少临时缓冲区**的内存占用,但总体成本要高得多.或多或少等同于groupByKey.mapValues(_.reduce)(当前行为)vs reduceByKey(预分区).不太可能在实践中有用.SqlContext.cacheTable.由于它看起来像是使用行程编码,因此应用OrderedRDDFunctions.repartitionAndSortWithinPartitions可以提高压缩率.性能高度依赖于密钥的分布.如果它是偏斜的,将导致次优的资源利用率.在最糟糕的情况下,根本不可能完成这项工作.
使用JDBC源进行分区:
JDBC数据源支持predicates参数.它可以使用如下:
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
Run Code Online (Sandbox Code Playgroud)
它为每个谓词创建一个JDBC分区.请记住,如果使用单个谓词创建的集合不是不相交的,您将在结果表中看到重复项.
partitionBy方法DataFrameWriter:
Spark DataFrameWriter提供了partitionBy可用于在写入时"分区"数据的方法.它使用提供的列集分隔写入数据
val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")
df.write.partitionBy("k").json("/tmp/foo.json")
Run Code Online (Sandbox Code Playgroud)
这使得谓词可以根据键调低读取查询:
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")
Run Code Online (Sandbox Code Playgroud)
但它并不等同于DataFrame.repartition.特别是如下聚合:
val cnts = df1.groupBy($"k").sum()
Run Code Online (Sandbox Code Playgroud)
仍然需要TungstenExchange:
cnts.explain
// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
Run Code Online (Sandbox Code Playgroud)
bucketBy方法DataFrameWriter(Spark> = 2.0):
bucketBy有类似的应用程序,partitionBy但它只适用于表(saveAsTable).Bucketing信息可用于优化连接:
// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")
// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
// :- *Sort [k#41 ASC NULLS FIRST], false, 0
// : +- *Project [k#41, v#42]
// : +- *Filter isnotnull(k#41)
// : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
// +- *Sort [k#46 ASC NULLS FIRST], false, 0
// +- *Project [k#46, v2#47]
// +- *Filter isnotnull(k#46)
// +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
Run Code Online (Sandbox Code Playgroud)
*通过分区布局我的意思是只有数据分布.partitionedRDD不再是分区器.**假设没有早期预测.如果聚合仅涵盖列的小子集,则可能无法获得任何收益.
Nig*_*olf 11
在Spark <1.6中如果您创建了一个HiveContext而不是普通的旧版本SqlContext,则可以使用HiveQL DISTRIBUTE BY colX...(确保N个减少器中的每一个都获得x的非重叠范围)和CLUSTER BY colX...(分配依据和排序依据的快捷方式);
df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
Run Code Online (Sandbox Code Playgroud)
不确定这如何适用于Spark DF api.普通的SqlContext不支持这些关键字(请注意,您不需要使用Hive元文件来使用HiveContext)
编辑: Spark 1.6+现在在本机DataFrame API中有这个
使用返回的DataFrame:
yourDF.orderBy(account)
Run Code Online (Sandbox Code Playgroud)
没有明确的方法在DataFrame上使用partitionBy,仅在PairRDD上,但是当您对DataFrame进行排序时,它将在其LogicalPlan中使用它,当您需要对每个帐户进行计算时,这将有所帮助.
我只是偶然发现了同样的问题,我希望按帐户划分数据帧.我假设当你说"想要对数据进行分区以便一个帐户的所有事务都在同一个Spark分区中"时,你想要它的规模和性能,但你的代码并不依赖它(比如使用mapPartitions()等),对吗?
因此,从某种答案开始:)-您不能
我不是专家,但是据我了解DataFrames,它们不等于rdd,DataFrame没有Partitioner之类的东西。
通常,DataFrame的想法是提供另一个层次的抽象本身来处理此类问题。将DataFrame上的查询转换为逻辑计划,然后将其进一步转换为对RDD的操作。您建议的分区可能会自动应用,或者至少应该应用。
如果您不相信SparkSQL会提供某种最佳工作,则可以始终按照注释中的建议将DataFrame转换为RDD [Row]。
小智 5
我能够使用RDD做到这一点.但我不知道这是否是一个可以接受的解决方案.将DF作为RDD提供后,您可以申请repartitionAndSortWithinPartitions执行自定义数据重新分区.
这是我使用的示例:
class DatePartitioner(partitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
val start_time: Long = key.asInstanceOf[Long]
Objects.hash(Array(start_time)) % partitions
}
override def numPartitions: Int = partitions
}
myRDD
.repartitionAndSortWithinPartitions(new DatePartitioner(24))
.map { v => v._2 }
.toDF()
.write.mode(SaveMode.Overwrite)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
141998 次 |
| 最近记录: |