如果我想创建一个StructType(即a DataFrame.schema)a case class,是否有办法在不创建的情况下创建DataFrame?我可以轻松地做到:
case class TestCase(id: Long)
val schema = Seq[TestCase]().toDF.schema
Run Code Online (Sandbox Code Playgroud)
但实际创建一个DataFrame我想要的只是模式似乎有点过分.
(如果你很好奇,问题背后的原因是我正在定义一个UserDefinedAggregateFunction,并且这样做会覆盖一些返回的方法,StructTypes并使用case类.)
我试图解决向数据集添加序列号的古老问题.我正在使用DataFrames,似乎没有相应的DataFrame RDD.zipWithIndex.另一方面,以下工作或多或少按我希望的方式工作:
val origDF = sqlContext.load(...)
val seqDF= sqlContext.createDataFrame(
origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)
Run Code Online (Sandbox Code Playgroud)
在我的实际应用程序中,origDF不会直接从文件中加载 - 它将通过将2-3个其他DataFrame连接在一起而创建,并将包含超过1亿行.
有一个更好的方法吗?我该怎么做才能优化它?
在标准SQL中,当您将表连接到自身时,可以为表创建别名以跟踪您引用的列:
SELECT a.column_name, b.column_name...
FROM table1 a, table1 b
WHERE a.common_field = b.common_field;
Run Code Online (Sandbox Code Playgroud)
我可以通过两种方式来使用Spark DataFrameAPI 实现相同的功能:
解决方案#1:重命名列
在回答这个问题时,有几种不同的方法可以解决这个问题.这个只是重命名具有特定后缀的所有列:
df.toDF(df.columns.map(_ + "_R"):_*)
Run Code Online (Sandbox Code Playgroud)
例如,您可以这样做:
df.join(df.toDF(df.columns.map(_ + "_R"):_*), $"common_field" === $"common_field_R")
Run Code Online (Sandbox Code Playgroud)
解决方案#2:将引用复制到 DataFrame
另一个简单的解决方案就是这样做:
val df: DataFrame = ....
val df_right = df
df.join(df_right, df("common_field") === df_right("common_field"))
Run Code Online (Sandbox Code Playgroud)
这两种解决方案都有效,我可以看到每种解决方案在某些情况下都很有用.我应该注意两者之间是否存在内部差异?
我一直在玩这个java-sizeof库(https://github.com/phatak-dev/java-sizeof)并用它来测量Apache Spark中的数据集大小.事实证明,这个Row物体非常大.就像非常大 - 为什么会这样?
采取一个相当简单的架构:
root
|-- account: string (nullable = true)
|-- date: long (nullable = true)
|-- dialed: string (nullable = true)
|-- duration: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
示例数据如下所示:
+-------+-------------+----------+--------+
|account| date| dialed|duration|
+-------+-------------+----------+--------+
| 5497|1434620384003|9075112643| 790.0|
+-------+-------------+----------+--------+
Run Code Online (Sandbox Code Playgroud)
所以现在我们做:
val row = df.take(1)(0)
// row: org.apache.spark.sql.Row = [5497,1434620384003,9075112643,790.0]
Run Code Online (Sandbox Code Playgroud)
所以现在我用 SizeEstimator
SizeEstimator.estimate(row)
// res19: Long = 85050896
Run Code Online (Sandbox Code Playgroud)
81兆字节!对于单排!认为这是某种错误,我这样做:
SizeEstimator.estimate(df.take(100))
// res20: Long = 85072696
Run Code Online (Sandbox Code Playgroud)
有趣的是,尽管拥有100倍的数据量,但它并没有大得多 - 只有大约20k.高于100,似乎是线性的.对于1,000行,它看起来像这样:
SizeEstimator.estimate(df.take(1000))
// res21: Long = 850711696 …Run Code Online (Sandbox Code Playgroud) 在我的项目中,我想实现ADD(+)函数,但我的参数可能LongType是DoubleType,IntType.我用sqlContext.udf.register("add",XXX),但我不知道怎么写XXX,这是制作泛型函数.
虽然我的第一本能是将DataFrames所有内容都使用,但这是不可能的-某些操作显然更容易和/或更好地作为RDD操作执行,更不用说某些GraphX仅适用于的API RDDs。
我似乎花了大量的时间,这些天之间来回转换DataFrames和RDDs-所以有什么性能影响?拿RDD.checkpoint-没有对DataFrame等的东西,所以当我这样做时,在幕后会发生什么:
val df = Seq((1,2),(3,4)).toDF("key","value")
val rdd = df.rdd.map(...)
val newDf = rdd.map(r => (r.getInt(0), r.getInt(1))).toDF("key","value")
Run Code Online (Sandbox Code Playgroud)
显然,这是一个很小的例子,但是很高兴知道转换中幕后发生了什么。