我正想通过有效的斯卡拉幻灯片,并提到在幻灯片10至从来不使用val的trait抽象成员和使用def来代替.幻灯片没有详细提及为什么val在a中使用抽象trait是一种反模式.如果有人可以解释在抽象方法的特性中使用val vs def的最佳实践,我将不胜感激
给定表1,其中一列为"x",类型为String.我想创建表2,其中列为"y",它是"x"中给出的日期字符串的整数表示形式.
必不可少的是将null值保留在"y"列中.
表1(数据帧df1):
+----------+
| x|
+----------+
|2015-09-12|
|2015-09-13|
| null|
| null|
+----------+
root
|-- x: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
表2(数据帧df2):
+----------+--------+
| x| y|
+----------+--------+
| null| null|
| null| null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
|-- x: string (nullable = true)
|-- y: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
用于将列"x"中的值转换为列"y"的用户定义函数(udf)为:
val extractDateAsInt = udf[Int, String] (
(d:String) => d.substring(0, 10)
.filterNot( "-".toSet)
.toInt )
Run Code Online (Sandbox Code Playgroud)
并且工作,处理空值是不可能的.
尽管如此,我可以做类似的事情
val extractDateAsIntWithNull = udf[Int, String] (
(d:String) =>
if (d != …Run Code Online (Sandbox Code Playgroud) scala nullable user-defined-functions apache-spark apache-spark-sql
以下示例代码尝试将一些案例对象放入数据框中.代码包括案例对象层次结构的定义和使用此特征的案例类:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
sealed trait Some
case object AType extends Some
case object BType extends Some
case class Data( name : String, t: Some)
object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}
Run Code Online (Sandbox Code Playgroud)
执行代码时,我遗憾地遇到以下异常:
java.lang.UnsupportedOperationException: Schema for type …Run Code Online (Sandbox Code Playgroud) 使用Spark 1.5.0并给出以下代码,我希望unionAll DataFrame基于它们的列名进行联合.在代码中,我使用一些FunSuite传递SparkContext sc:
object Entities {
case class A (a: Int, b: Int)
case class B (b: Int, a: Int)
val as = Seq(
A(1,3),
A(2,4)
)
val bs = Seq(
B(5,3),
B(6,4)
)
}
class UnsortedTestSuite extends SparkFunSuite {
configuredUnitTest("The truth test.") { sc =>
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val aDF = sc.parallelize(Entities.as, 4).toDF
val bDF = sc.parallelize(Entities.bs, 4).toDF
aDF.show()
bDF.show()
aDF.unionAll(bDF).show
}
}
Run Code Online (Sandbox Code Playgroud)
输出:
+---+---+
| a| b|
+---+---+
| 1| 3| …Run Code Online (Sandbox Code Playgroud) 以下代码来自Apache Spark的快速入门指南.有人可以解释一下"线"变量是什么以及它来自何处?
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
Run Code Online (Sandbox Code Playgroud)
另外,如何将值传递给a,b?
是否可以向DataFrames 添加额外的元数据?
我有Spark DataFrame,我需要保留额外的信息.示例:A DataFrame,我想要"记住"Integer id列中使用率最高的索引.
我使用单独的DataFrame来存储这些信息.当然,单独保存这些信息是单调乏味且容易出错的.
有没有更好的解决方案来存储这样的额外信息DataFrame?
给定一个数据帧 df
id | date
---------------
1 | 2015-09-01
2 | 2015-09-01
1 | 2015-09-03
1 | 2015-09-04
2 | 2015-09-04
Run Code Online (Sandbox Code Playgroud)
我想创建一个运行计数器或索引,
从而
id | date | counter
--------------------------
1 | 2015-09-01 | 1
1 | 2015-09-03 | 2
1 | 2015-09-04 | 3
2 | 2015-09-01 | 1
2 | 2015-09-04 | 2
Run Code Online (Sandbox Code Playgroud)
这是我可以通过窗口功能实现的,例如
val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )
Run Code Online (Sandbox Code Playgroud)
不幸的是,Spark 1.4.1不支持常规数据帧的窗口函数:
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, …Run Code Online (Sandbox Code Playgroud) 来自spark-sql的这个例子中的列名来自case class Person.
case class Person(name: String, age: Int)
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
Run Code Online (Sandbox Code Playgroud)
https://spark.apache.org/docs/1.1.0/sql-programming-guide.html
但是,在许多情况下,参数名称可能会更改.如果文件尚未更新以反映更改,则会导致找不到列.
如何指定适当的映射?
我想的是:
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val ps: Seq[Person] = ???
val personRDD = sc.parallelize(ps)
// Apply the schema to the …Run Code Online (Sandbox Code Playgroud) 我需要在DataFrame上定义自定义方法.有什么更好的方法呢?解决方案应该是可扩展的,因为我打算定义大量的自定义方法.
我当前的方法是MyClass使用DataFrameas参数创建一个类(比如说),在其中定义我的自定义方法(比如说customMethod)并定义一个转换DataFrame为的隐式方法MyClass.
implicit def dataFrametoMyClass(df: DataFrame): MyClass = new MyClass(df)
Run Code Online (Sandbox Code Playgroud)
因此,我可以打电话:
dataFrame.customMethod()
Run Code Online (Sandbox Code Playgroud)
这是正确的方法吗?公开征求意见.
我有一个DataFrame df有五列的Spark .我想添加另一列,其值为第一列和第二列的元组.当使用withColumn()方法时,我得到不匹配错误,因为输入不是列类型,而是(列,列).我想知道在这种情况下是否有一个解决方案旁边的行循环运行?
var dfCol=(col1:Column,col2:Column)=>(col1,col2)
val vv = df.withColumn( "NewColumn", dfCol( df(df.schema.fieldNames(1)) , df(df.schema.fieldNames(2)) ) )
Run Code Online (Sandbox Code Playgroud) scala ×10
apache-spark ×8
case-class ×1
closures ×1
dataframe ×1
inheritance ×1
nullable ×1
parquet ×1
traits ×1