小编Dav*_*fin的帖子

从案例类生成Spark StructType/Schema

如果我想创建一个StructType(即a DataFrame.schema)a case class,是否有办法在不创建的情况下创建DataFrame?我可以轻松地做到:

case class TestCase(id: Long)
val schema = Seq[TestCase]().toDF.schema
Run Code Online (Sandbox Code Playgroud)

但实际创建一个DataFrame我想要的只是模式似乎有点过分.

(如果你很好奇,问题背后的原因是我正在定义一个UserDefinedAggregateFunction,并且这样做会覆盖一些返回的方法,StructTypes并使用case类.)

apache-spark apache-spark-sql

47
推荐指数
4
解决办法
2万
查看次数

DataFrame-ified zipWithIndex

我试图解决向数据集添加序列号的古老问题.我正在使用DataFrames,似乎没有相应的DataFrame RDD.zipWithIndex.另一方面,以下工作或多或少按我希望的方式工作:

val origDF = sqlContext.load(...)    

val seqDF= sqlContext.createDataFrame(
    origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
    StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)
Run Code Online (Sandbox Code Playgroud)

在我的实际应用程序中,origDF不会直接从文件中加载 - 它将通过将2-3个其他DataFrame连接在一起而创建,并将包含超过1亿行.

有一个更好的方法吗?我该怎么做才能优化它?

apache-spark apache-spark-sql

34
推荐指数
5
解决办法
2万
查看次数

在Spark中执行DataFrame自联接的最干净,最有效的语法

在标准SQL中,当您将表连接到自身时,可以为表创建别名以跟踪您引用的列:

SELECT a.column_name, b.column_name...
FROM table1 a, table1 b
WHERE a.common_field = b.common_field;
Run Code Online (Sandbox Code Playgroud)

我可以通过两种方式来使用Spark DataFrameAPI 实现相同的功能:

解决方案#1:重命名列

在回答这个问题时,有几种不同的方法可以解决这个问题.这个只是重命名具有特定后缀的所有列:

df.toDF(df.columns.map(_ + "_R"):_*)
Run Code Online (Sandbox Code Playgroud)

例如,您可以这样做:

df.join(df.toDF(df.columns.map(_ + "_R"):_*), $"common_field" === $"common_field_R")
Run Code Online (Sandbox Code Playgroud)

解决方案#2:将引用复制到 DataFrame

另一个简单的解决方案就是这样做:

val df: DataFrame = ....
val df_right = df

df.join(df_right, df("common_field") === df_right("common_field"))
Run Code Online (Sandbox Code Playgroud)

这两种解决方案都有效,我可以看到每种解决方案在某些情况下都很有用.我应该注意两者之间是否存在内部差异?

dataframe apache-spark apache-spark-sql

31
推荐指数
1
解决办法
2万
查看次数

为什么Spark Row对象与等效结构相比如此之大?

我一直在玩这个java-sizeof库(https://github.com/phatak-dev/java-sizeof)并用它来测量Apache Spark中的数据集大小.事实证明,这个Row物体非常大.就像非常大 - 为什么会这样?

采取一个相当简单的架构:

root
 |-- account: string (nullable = true)
 |-- date: long (nullable = true)
 |-- dialed: string (nullable = true)
 |-- duration: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

示例数据如下所示:

+-------+-------------+----------+--------+
|account|         date|    dialed|duration|
+-------+-------------+----------+--------+
|   5497|1434620384003|9075112643|   790.0|
+-------+-------------+----------+--------+
Run Code Online (Sandbox Code Playgroud)

所以现在我们做:

val row = df.take(1)(0)
// row: org.apache.spark.sql.Row = [5497,1434620384003,9075112643,790.0]
Run Code Online (Sandbox Code Playgroud)

所以现在我用 SizeEstimator

SizeEstimator.estimate(row)
// res19: Long = 85050896
Run Code Online (Sandbox Code Playgroud)

81兆字节!对于单排!认为这是某种错误,我这样做:

SizeEstimator.estimate(df.take(100))
// res20: Long = 85072696
Run Code Online (Sandbox Code Playgroud)

有趣的是,尽管拥有100倍的数据量,但它并没有大得多 - 只有大约20k.高于100,似乎是线性的.对于1,000行,它看起来像这样:

SizeEstimator.estimate(df.take(1000))
// res21: Long = 850711696 …
Run Code Online (Sandbox Code Playgroud)

apache-spark

9
推荐指数
1
解决办法
272
查看次数

在Spark SQL中,如何注册和使用通用UDF?

在我的项目中,我想实现ADD(+)函数,但我的参数可能LongTypeDoubleType,IntType.我用sqlContext.udf.register("add",XXX),但我不知道怎么写XXX,这是制作泛型函数.

scala apache-spark udf

4
推荐指数
2
解决办法
3362
查看次数

在DataFrame,RDD和Back之间进行转换会对性能产生什么影响?

虽然我的第一本能是将DataFrames所有内容都使用,但这是不可能的-某些操作显然更容易和/或更好地作为RDD操作执行,更不用说某些GraphX仅适用于的API RDDs

我似乎花了大量的时间,这些天之间来回转换DataFramesRDDs-所以有什么性能影响?拿RDD.checkpoint-没有对DataFrame等的东西,所以当我这样做时,在幕后会发生什么:

val df = Seq((1,2),(3,4)).toDF("key","value")
val rdd = df.rdd.map(...)
val newDf = rdd.map(r => (r.getInt(0), r.getInt(1))).toDF("key","value")
Run Code Online (Sandbox Code Playgroud)

显然,这是一个很小的例子,但是很高兴知道转换中幕后发生了什么。

scala apache-spark

4
推荐指数
1
解决办法
2324
查看次数

标签 统计

apache-spark ×6

apache-spark-sql ×3

scala ×2

dataframe ×1

udf ×1